1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
use futures::future::FutureExt;
use futures::select;
use futures_timer::Delay;
use log::debug;
use log::error;
use log::info;
use mqtt_client::Config;
use mqtt_client::Message;
use mqtt_client::Topic;
use mqtt_client::{Client, ErrorStream, MessageStream};
use rand::prelude::*;
use std::time::Duration;
const C8Y_TEMPLATE_RESTART: &str = "510";
const C8Y_TEMPLATE_TEMPERATURE: &str = "211";
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
let c8y_msg = Topic::new("c8y/s/us")?;
let c8y_cmd = Topic::new("c8y/s/ds")?;
let c8y_err = Topic::new("c8y/s/e")?;
init_logger();
let mqtt = Client::connect("temperature", &Config::default()).await?;
let commands = mqtt.subscribe(c8y_cmd.filter()).await?;
let c8y_errors = mqtt.subscribe(c8y_err.filter()).await?;
let errors = mqtt.subscribe_errors();
tokio::spawn(publish_temperature(mqtt, c8y_msg));
select! {
_ = listen_command(commands).fuse() => (),
_ = listen_c8y_error(c8y_errors).fuse() => (),
_ = listen_error(errors).fuse() => (),
}
Ok(())
}
async fn publish_temperature(mqtt: Client, c8y_msg: Topic) -> Result<(), mqtt_client::Error> {
let mut temperature: i32 = random_in_range(-10, 20);
info!("Publishing temperature measurements");
for _ in 1..10 {
let delta = random_in_range(-1, 2);
temperature = temperature + delta;
let payload = format!("{},{}", C8Y_TEMPLATE_TEMPERATURE, temperature);
debug!("{}", payload);
mqtt.publish(Message::new(&c8y_msg, payload)).await?;
Delay::new(Duration::from_millis(1000)).await;
}
mqtt.disconnect().await?;
Ok(())
}
fn random_in_range(low: i32, high: i32) -> i32 {
let mut rng = thread_rng();
rng.gen_range(low..high)
}
async fn listen_command(mut messages: MessageStream) {
while let Some(message) = messages.next().await {
debug!("C8Y command: {:?}", message.payload);
if let Some(cmd) = std::str::from_utf8(&message.payload).ok() {
if cmd.contains(C8Y_TEMPLATE_RESTART) {
info!("Stopping on remote request ... should be restarted by the daemon monitor.");
break;
}
}
}
}
async fn listen_c8y_error(mut messages: MessageStream) {
while let Some(message) = messages.next().await {
error!("C8Y error: {:?}", message.payload);
}
}
async fn listen_error(mut errors: ErrorStream) {
while let Some(error) = errors.next().await {
error!("System error: {}", error);
}
}
fn init_logger() {
let logger = env_logger::Logger::from_default_env();
let task_id = 1;
async_log::Logger::wrap(logger, move || task_id)
.start(log::LevelFilter::Trace)
.unwrap();
}
|