summaryrefslogtreecommitdiffstats
path: root/crates/tests/sawtooth_publisher/src/main.rs
blob: 527eefd3fecb348a5566aefc2efed6feb979d104 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
use futures::future::FutureExt;
use futures::select;
use futures_timer::Delay;
use log::debug;
use log::error;
use log::info;
use mqtt_channel::{
    Config, Connection, ErrChannel, Message, MqttError, PubChannel, SubChannel, Topic, TopicFilter,
};
use std::convert::TryFrom;
use std::env;
use std::fmt::Write as _;
use std::io::Write;
use std::process;
use std::time::{Duration, Instant};

/*

This is a small and flexible publisher for deterministic test data.

- TODO: Improve code quality
- TODO: Add different data types for JSON publishing
- TODO: Command line switch to swith betwen REST and JSON
- TODO: Currently REST sending is disabled and JSON publishing is enabled
- TODO: Add QoS selection
*/

// Templates:
// https://cumulocity.com/guides/10.4.6/device-sdk/mqtt/
//
// Create custom measurement (200)
// Create signal strength measurement (210)
// Create temperature measurement (211)
// Create battery measurement (212)

// sawtooth_publisher <wait_time_ms> <height> <iterations> <template>
//
// cargo run sawtooth_publisher 100 100 100 flux
// cargo run sawtooth_publisher 1000 10 10 sawmill

#[tokio::main(flavor = "current_thread")]
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let args: Vec<String> = env::args().collect();
    // wait time, template, tooth-height,
    if args.len() != 5 {
        println!("Usage: sawtooth_publisher <wait_time_ms> <height> <iterations> <template: sawmill|flux>");
        panic!("Errof: Not enough Command line Arguments");
    }
    let wait: i32 = args[1].parse().expect("Cannot parse wait time");
    let height: i32 = args[2].parse().expect("Cannot parse height");
    let iterations: i32 = args[3].parse().expect("Cannot parse iterations");
    let template: String = String::from(&args[4]);

    println!(
        "Publishing sawtooth with delay {}ms height {} iterations {} template {} will cause {} publishs.",
        wait,
        height,
        iterations,
        template,
        height * iterations
    );
    let c8y_msg = Topic::new("tedge/measurements")?;
    let c8y_err = TopicFilter::new("c8y/s/e")?;

    init_logger();

    let name = "sawtooth_".to_string() + &process::id().to_string();
    let config = Config::default()
        .with_clean_session(true)
        .with_session_name(name)
        .with_subscriptions(c8y_err);
    let mqtt = Connection::new(&config).await?;

    let mqtt_pub_channel = mqtt.published;
    let c8y_errors = mqtt.received;
    let errors = mqtt.errors;

    let start = Instant::now();

    if template == "flux" {
        tokio::spawn(publish_topic(
            mqtt_pub_channel,
            c8y_msg,
            wait,
            height,
            iterations,
        ));
    } else if template == "sawmill" {
        tokio::spawn(publish_multi_topic(
            mqtt_pub_channel,
            c8y_msg,
            wait,
            height,
            iterations,
        ));
    } else {
        println!("Wrong template");
        panic!("Exiting");
    };

    select! {
        _ = listen_c8y_error(c8y_errors).fuse() => (),
        _ = listen_error(errors).fuse() => (),
    }

    let elapsed = start.elapsed();
    println!(
        "Execution took {} s {} ms",
        elapsed.as_secs(),
        elapsed.as_millis()
    );

    let elapsedm: u32 = u32::try_from(elapsed.as_millis()).unwrap();
    let elapsedmsf: f64 = f64::try_from(elapsedm).unwrap();
    let rate: f64 =
        elapsedmsf / (f64::try_from(height).unwrap() * f64::try_from(iterations).unwrap());

    let pubpersec = 1.0 / rate * 1000.0;
    println!("Publish rate: {:.3} ms/pub", rate);
    println!("Publish per second: {:.3} pub/s", pubpersec);

    Ok(())
}

async fn publish_topic(
    mut mqtt: impl PubChannel,
    c8y_msg: Topic,
    wait: i32,
    height: i32,
    iterations: i32,
) -> Result<(), MqttError> {
    info!("Publishing temperature measurements");
    println!();
    for iteration in 0..iterations {
        for value in 0..height {
            let payload = format!("{{ {}: {} }}", "\"Flux [F]\"", value);
            debug!("{} ", value);
            debug!("{}", payload);

            mqtt.publish(Message::new(&c8y_msg, payload)).await?;
            Delay::new(Duration::from_millis(u64::try_from(wait).unwrap())).await;
            std::io::stdout().flush().expect("Flush failed");
        }
        println!("Iteraton: {}", iteration);
    }
    println!();

    let _ = mqtt.close().await;
    Ok(())
}

async fn publish_multi_topic(
    mut mqtt: impl PubChannel,
    c8y_msg: Topic,
    wait: i32,
    height: i32,
    iterations: i32,
) -> Result<(), MqttError> {
    info!("Publishing temperature measurements");
    println!();
    let series_name = "\"Sawmill [S]\"";
    let series_count = 10;
    for iteration in 0..iterations {
        for value in 0..height {
            let mut series: String = String::new();
            for s in 0..series_count {
                let _ = write!(
                    series,
                    "\"saw_{}\": {} ,",
                    s,
                    (value + s * height / series_count) % height
                );
            }
            let seriesx = &series.trim_end_matches(',');

            let payload = format!("{{ {}: {{ {} }} }}", series_name, seriesx);
            debug!("{} ", value);
            debug!("{}", payload);

            mqtt.publish(Message::new(&c8y_msg, payload)).await?;

            Delay::new(Duration::from_millis(u64::try_from(wait).unwrap())).await;
            std::io::stdout().flush().expect("Flush failed");
        }
        println!("Iteraton: {}", iteration);
    }
    println!();

    let _ = mqtt.close().await;
    Ok(())
}

async fn listen_c8y_error(mut messages: impl SubChannel) {
    let mut count: u32 = 0;
    while let Some(message) = messages.next().await {
        error!("C8Y error: {:?}", message.payload_str());
        if count >= 3 {
            panic!("Panic!");
        }
        count += 1;
    }
}

async fn listen_error(mut errors: impl ErrChannel) {
    let mut count: u32 = 0;
    while let Some(error) = errors.next().await {
        error!("System error: {}", error);
        if count >= 3 {
            panic!("Panic!");
        }
        count += 1;
    }
}

fn init_logger() {
    let logger = env_logger::Logger::from_default_env();
    let task_id = 1;

    async_log::Logger::wrap(logger, move || task_id)
        .start(log::LevelFilter::Trace)
        .unwrap();
}