diff options
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | Cargo.lock | 30 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | common/mqtt_client/src/lib.rs | 8 | ||||
-rw-r--r-- | mapper/dm_agent/Cargo.toml | 27 | ||||
-rw-r--r-- | mapper/dm_agent/src/batcher.rs | 392 | ||||
-rw-r--r-- | mapper/dm_agent/src/collectd.rs | 250 | ||||
-rw-r--r-- | mapper/dm_agent/src/main.rs | 31 | ||||
-rw-r--r-- | mapper/dm_agent/src/monitor.rs | 50 | ||||
-rw-r--r-- | mapper/dm_agent/src/mqtt.rs | 86 | ||||
-rw-r--r-- | mapper/thin_edge_json/src/group.rs | 56 |
11 files changed, 932 insertions, 0 deletions
@@ -6,6 +6,7 @@ **/*.rs.bk .idea/ +*.iml .tmp/ .venv/ @@ -140,6 +140,17 @@ dependencies = [ ] [[package]] +name = "async-trait" +version = "0.1.50" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b98e84bbb4cbcdd97da190ba0c58a1bb0de2c1fdf67d159e192ed766aeca722" +dependencies = [ + "proc-macro2 1.0.26", + "quote 1.0.9", + "syn 1.0.68", +] + +[[package]] name = "atty" version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1827,6 +1838,25 @@ dependencies = [ ] [[package]] +name = "tedge_dm_agent" +version = "0.1.0" +dependencies = [ + "anyhow", + "assert_matches", + "async-trait", + "chrono", + "futures", + "mockall", + "mqtt_client", + "thin_edge_json", + "thiserror", + "tokio", + "tokio-test", + "tracing", + "tracing-subscriber", +] + +[[package]] name = "tedge_mapper" version = "0.1.0" dependencies = [ @@ -8,6 +8,7 @@ members = [ "mapper/tedge_mapper", "mapper/cumulocity/c8y_translator_lib", "mapper/thin_edge_json", + "mapper/dm_agent" ] [profile.release] diff --git a/common/mqtt_client/src/lib.rs b/common/mqtt_client/src/lib.rs index 3dd05c3f..20f22d55 100644 --- a/common/mqtt_client/src/lib.rs +++ b/common/mqtt_client/src/lib.rs @@ -370,6 +370,14 @@ impl Config { } } + /// Update queue_capcity. + pub fn queue_capacity(self, queue_capacity: usize) -> Self { + Self { + queue_capacity, + ..self + } + } + /// Enable `clean_session`. pub fn clean_session(self) -> Self { Self { diff --git a/mapper/dm_agent/Cargo.toml b/mapper/dm_agent/Cargo.toml new file mode 100644 index 00000000..84371007 --- /dev/null +++ b/mapper/dm_agent/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "tedge_dm_agent" +version = "0.1.0" +edition = "2018" +authors = ["thin-edge.io team <info@thin-edge.io>"] +license = "Apache-2.0" +readme = "README.md" +description = "The daemon providing monitoring and remote management capabilities to tedge" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +thin_edge_json = {path = "../thin_edge_json" } +mqtt_client = {path = "../../common/mqtt_client" } +chrono = "0.4" +futures = "0.3" +tokio = { version = "1.1", features = ["rt", "sync", "time"] } +anyhow = "1.0" +thiserror = "1.0" +tracing = { version = "0.1", features = ["attributes", "log"] } +tracing-subscriber = "0.2" +mockall = "0.9" +async-trait = "0.1" + +[dev-dependencies] +assert_matches = "1.4" +tokio-test = "0.4.1" diff --git a/mapper/dm_agent/src/batcher.rs b/mapper/dm_agent/src/batcher.rs new file mode 100644 index 00000000..3cd7665d --- /dev/null +++ b/mapper/dm_agent/src/batcher.rs @@ -0,0 +1,392 @@ +use crate::mqtt::{MqttClient, MqttMessageStream}; +use mqtt_client::{Message, QoS, Topic, TopicFilter}; +use std::{sync::Arc, time::Duration}; +use thin_edge_json::{ + group::MeasurementGrouper, + measurement::current_timestamp, + measurement::FlatMeasurementVisitor, + serialize::{ThinEdgeJsonSerializationError, ThinEdgeJsonSerializer}, +}; +use tokio::{ + select, + sync::mpsc::{error::SendError, UnboundedReceiver, UnboundedSender}, + time::{interval, Interval}, +}; +use tracing::{error, log::warn}; + +use crate::collectd::{self, CollectdMessage}; + +const DEFAULT_STATS_COLLECTION_WINDOW: u64 = 1000; + +const SOURCE_TOPIC: &str = "collectd/#"; +const TARGET_TOPIC: &str = "tedge/measurements"; + +#[derive(thiserror::Error, Debug)] +pub enum DeviceMonitorError { + #[error(transparent)] + MqttClientError(#[from] Arc<mqtt_client::Error>), + + #[error(transparent)] + InvalidCollectdMeasurementError(#[from] collectd::CollectdError), + + #[error(transparent)] + InvalidThinEdgeJsonError(#[from] thin_edge_json::group::MeasurementGrouperError), + + #[error(transparent)] + ThinEdgeJsonSerializationError(#[from] ThinEdgeJsonSerializationError), + + #[error(transparent)] + BatchingError(#[from] SendError<MeasurementGrouper>), +} + +impl From<mqtt_client::Error> for DeviceMonitorError { + fn from(error: mqtt_client::Error) -> Self { + Self::MqttClientError(Arc::new(error)) + } +} + +#[derive(Debug)] +pub struct MessageBatch { + message_grouper: MeasurementGrouper, +} + +impl MessageBatch { + pub fn start_batch(message: Message) -> Result<Self, DeviceMonitorError> { + let mut message_grouper = MeasurementGrouper::new(); + message_grouper.timestamp(¤t_timestamp())?; + + let mut message_batch = Self { message_grouper }; + + message_batch.add_to_batch(message)?; + + Ok(message_batch) + } + + pub fn add_to_batch(&mut self, message: Message) -> Result<(), DeviceMonitorError> { + let collectd_message = CollectdMessage::parse_from(&message)?; + + self.message_grouper.measurement( + Some(collectd_message.metric_group_key), + collectd_message.metric_key, + collectd_message.metric_value, + )?; + + Ok(()) + } + + pub fn end_batch(self) -> MeasurementGrouper { + self.message_grouper + } +} + +pub struct MessageBatcher { + sender: UnboundedSender<MeasurementGrouper>, + mqtt_client: Arc<dyn MqttClient>, + topic_filter: TopicFilter, +} + +impl MessageBatcher { + pub fn new( + sender: UnboundedSender<MeasurementGrouper>, + mqtt_client: Arc<dyn MqttClient>, + ) -> Result<Self, DeviceMonitorError> { + let topic_filter = TopicFilter::new(SOURCE_TOPIC)?.qos(QoS::AtMostOnce); + Ok(Self { + sender, + mqtt_client, + topic_filter, + }) + } + + pub async fn run(&self) -> Result<(), DeviceMonitorError> { + let mut messages = self + .mqtt_client + .subscribe(self.topic_filter.clone()) + .await?; + + let batching_window = Duration::from_millis(DEFAULT_STATS_COLLECTION_WINDOW); + + loop { + match messages.next().await { + Some(message) => { + // Build a message batch until the batching window times out and return the batch + let message_batch_result = self + .build_message_batch_with_timeout( + message, + messages.as_mut(), + interval(batching_window), + ) + .await; + + match message_batch_result { + Ok(message_batch) => { + //Send the current batch to the batch processor + let _ = self.sender.send(message_batch).map_err(|err| { + error!("Error while publishing a message batch: {}", err) + }); + } + Err(err) => { + error!("Error while building a message batch: {}", err); + } + } + } + None => { + //If the message batching loop returns, it means the MQTT connection has closed + error!("MQTT connection closed. Retrying..."); + } + } + } + } + + async fn build_message_batch_with_timeout( + &self, + first_message: Message, + messages: &mut dyn MqttMessageStream, + mut timeout: Interval, + ) -> Result<MeasurementGrouper, DeviceMonitorError> { + let mut message_batch = MessageBatch::start_batch(first_message)?; + timeout.tick().await; // The first tick starts the timeout window + + loop { + select! { + maybe_message = messages.next() => { + match maybe_message { + Some(message) => message_batch.add_to_batch(message)?, + None => break + } + } + + _result = timeout.tick() => { + break; + } + } + } + + Ok(message_batch.end_batch()) + } +} + +pub struct MessageBatchPublisher { + receiver: UnboundedReceiver<MeasurementGrouper>, + mqtt_client: Arc<dyn MqttClient>, + topic: Topic, +} + +impl MessageBatchPublisher { + pub fn new( + receiver: UnboundedReceiver<MeasurementGrouper>, + mqtt_client: Arc<dyn MqttClient>, + ) -> Result<Self, DeviceMonitorError> { + let topic = Topic::new(TARGET_TOPIC)?; + + Ok(Self { + receiver, + mqtt_client, + topic, + }) + } + + pub async fn run(&mut self) { + while let Some(message_grouper) = self.receiver.recv().await { + if let Err(err) = self.publish_as_mqtt_message(message_grouper).await { + error!("Error publishing the measurement batch: {}", err); + } + } + + warn!("MQTT message channel closed. Can not proceed"); + } + + async fn publish_as_mqtt_message( + &mut self, + message_grouper: MeasurementGrouper, + ) -> Result<(), DeviceMonitorError> { + let mut tedge_json_serializer = ThinEdgeJsonSerializer::new(); + message_grouper.accept(&mut tedge_json_serializer)?; + + let tedge_message = Message::new(&self.topic, tedge_json_serializer.bytes()?); + + self.mqtt_client.publish(tedge_message).await?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + + use crate::mqtt::MockMqttClient; + use crate::mqtt::MockMqttErrorStream; + use crate::mqtt::MockMqttMessageStream; + use assert_matches::assert_matches; + use futures::future::{pending, ready}; + use mockall::Sequence; + use tokio::time::sleep; + + use super::*; + + #[test] + fn test_message_batch_processor() -> anyhow::Result<()> { + let topic = Topic::new("collectd/localhost/temperature/value").unwrap(); + let collectd_message = Message::new(&topic, "123456789:32.5"); + let mut message_batch = MessageBatch::start_batch(collectd_message)?; + + let topic = Topic::new("collectd/localhost/coordinate/x").unwrap(); + let collectd_message = Message::new(&topic, "123456789:50"); + message_batch.add_to_batch(collectd_message)?; + + let topic = Topic::new("collectd/localhost/coordinate/y").unwrap(); + let collectd_message = Message::new(&topic, "123456789:70"); + message_batch.add_to_batch(collectd_message)?; + + let topic = Topic::new("collectd/localhost/pressure/value").unwrap(); + let collectd_message = Message::new(&topic, "123456789:98.2"); + message_batch.add_to_batch(collectd_message)?; + + let topic = Topic::new("collectd/localhost/coordinate/z").unwrap(); + let collectd_message = Message::new(&topic, "123456789:90"); + message_batch.add_to_batch(collectd_message)?; + + let message_grouper = message_batch.end_batch(); + + assert_matches!(message_grouper.timestamp, Some(_)); + + assert_eq!( + message_grouper.get_measurement_value(Some("temperature"), "value"), + Some(32.5) + ); + assert_eq!( + message_grouper.get_measurement_value(Some("pressure"), "value"), + Some(98.2) + ); + assert_eq!( + message_grouper.get_measurement_value(Some("coordinate"), "x"), + Some(50.0) + ); + assert_eq!( + message_grouper.get_measurement_value(Some("coordinate"), "y"), + Some(70.0) + ); + assert_eq!( + message_grouper.get_measurement_value(Some("coordinate"), "z"), + Some(90.0) + ); + + Ok(()) + } + + #[test] + fn invalid_collectd_message_format() { + let topic = Topic::new("collectd/host/group/key").unwrap(); + let invalid_collectd_message = Message::new(&topic, "123456789"); + let result = MessageBatch::start_batch(invalid_collectd_message); + + assert_matches!( + result, + Err(DeviceMonitorError::InvalidCollectdMeasurementError(_)) + ); + } + + #[tokio::test] + async fn batch_publisher() { + let mut message_grouper = MeasurementGrouper::new(); + message_grouper + .measurement(Some("temperature"), "value", 32.5) + .unwrap(); + + let (_sender, receiver) = tokio::sync::mpsc::unbounded_channel::<MeasurementGrouper>(); + + let mut mqtt_client = MockMqttClient::new(); + mqtt_client.expect_publish().times(1).returning(|message| { + assert_eq!(message.topic.name, TARGET_TOPIC.to_string()); //The test assertion happens here + Ok(123) + }); + + let mut publisher = MessageBatchPublisher::new(receiver, Arc::new(mqtt_client)).unwrap(); + publisher + .publish_as_mqtt_message(message_grouper) + .await + .unwrap(); + } + + //TODO Control the timeout better with mocked clocks + #[tokio::test] + async fn batching_with_window_timeout() { + let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel::<MeasurementGrouper>(); + + let mut mqtt_client = MockMqttClient::new(); + + mqtt_client.expect_subscribe_errors().returning(|| { + let error_stream = MockMqttErrorStream::default(); + Box::new(error_stream) + }); + + mqtt_client.expect_subscribe().returning(|_| { + let message_stream = MockMqttMessageStream::default(); + Ok(Box::new(message_stream)) + }); + + let mut seq = Sequence::new(); //To control the order of mock returns + let mut message_stream = MockMqttMessageStream::default(); + message_stream + .expect_next() + .times(1) + .in_sequence(&mut seq) //The first value to be returned by this mock stream + .returning(|| { + let topic = Topic::new("collectd/localhost/temperature/value").unwrap(); + let message = Message::new(&topic, "123456789:32.5"); + Box::pin(ready(Some(message))) + }); + + message_stream + .expect_next() + .times(1) + .in_sequence(&mut seq) //The second value to be returend by this mock stream + .returning(|| { + let topic = Topic::new("collectd/localhost/pressure/value").unwrap(); + let message = Message::new(&topic, "123456789:98.2"); + Box::pin(ready(Some(message))) + }); + + //The third message from this stream will be returned only after the batching window + message_stream + .expect_next() + .times(1) + .in_sequence(&mut seq) //The third value to be returend by this mock stream + .returning(|| { + Box::pin(async { + sleep(Duration::from_millis(1000)).await; //Sleep for a duration greater than the batching window + let topic = Topic::new("collectd/localhost/speed/value").unwrap(); + let message = Message::new(&topic, "123456789:350"); + Some(message) + }) + }); + + //Block the stream from the 4th invocation onwards + message_stream + .expect_next() + .returning(|| Box::pin(pending())); //Block the stream with a pending future + + let mut timeout = interval(Duration::from_millis(500)); + timeout.tick().await; // The first tick starts the timeout window + + let first_message = message_stream.next().await.unwrap(); + let builder = MessageBatcher::new(sender, Arc::new(mqtt_client)).unwrap(); + let message_grouper = builder + .build_message_batch_with_timeout(first_message, &mut message_stream, timeout) + .await + .unwrap(); + + assert_eq!( + message_grouper.get_measurement_value(Some("temperature"), "value"), + Some(32.5) + ); + assert_eq!( + message_grouper.get_measurement_value(Some("pressure"), "value"), + Some(98.2) + ); + assert_eq!( + message_grouper.get_measurement_value(Some("speed"), "value"), + None //This measurement isn't included in the batch because it came after the batching window + ); + } +} diff --git a/mapper/dm_agent/src/collectd.rs b/mapper/dm_agent/src/collectd.rs new file mode 100644 index 00000000..ad87e94c --- /dev/null +++ b/mapper/dm_agent/src/collectd.rs @@ -0,0 +1,250 @@ +use std::str::from_utf8; + +use mqtt_client::Message; + +#[derive(Debug)] +pub struct CollectdMessage<'a> { + pub metric_group_key: &'a str, + pub metric_key: &'a str, + pub metric_value: f64, +} + +#[derive(thiserror::Error, Debug)] +pub enum CollectdError { + #[error( + "Message received on invalid collectd topic: {0}. \ + Collectd message topics must be in the format collectd/<hostname>/<metric-plugin-name>/<metric-key>" + )] + InvalidMeasurementTopic(String), + + #[error("Invalid payload received on topic: {0}. Error: {1}")] + InvalidMeasurementPayload(String, CollectdPayloadError), +} + +impl<'a> CollectdMessage<'a> { + pub fn parse_from(mqtt_message: &'a Message) -> Result<Self, CollectdError> { + let topic = mqtt_message.topic.name.as_str(); + let collectd_topic = match CollectdTopic::from_str(topic) { + Ok(collectd_topic) => collectd_topic, + Err(_) => { + return Err(CollectdError::InvalidMeasurementTopic(topic.into())); + } + }; + + let collectd_payload = CollectdPayload::parse_from(mqtt_message.payload.as_slice()) + .map_err(|err| CollectdError::InvalidMeasurementPayload(topic.into(), err))?; + + Ok(CollectdMessage { + metric_group_key: collectd_topic.metric_group_key, + metric_key: collectd_topic.metric_key, + metric_value: collectd_payload.metric_value, + }) + } +} + +#[derive(Debug)] +struct CollectdTopic<'a> { + metric_group_key: &'a str, + metric_key: &'a str, +} + +#[derive(Debug)] +struct InvalidCollectdTopicName; + +impl<'a> CollectdTopic<'a> { + fn from_str(topic_name: &'a str) -> Result<Self, InvalidCollectdTopicName> { + let mut iter = topic_name.split('/'); + let _collectd_prefix = iter.next().ok_or(InvalidCollectdTopicName)?; + let _hostname = iter.next().ok_or(InvalidCollectdTopicName)?; + let metric_group_key = iter.next().ok_or(InvalidCollectdTopicName)?; + let metric_key = iter.next().ok_or(InvalidCollectdTopicName)?; + + match iter.next() { + None => Ok(CollectdTopic { + metric_group_key, + metric_key, + }), + Some(_) => Err(InvalidCollectdTopicName), + } + } +} + +#[derive(Debug)] +struct CollectdPayload { + _timestamp: f64, + metric_value: f64, +} + +#[derive(thiserror::Error, Debug)] +pub enum CollectdPayloadError { + #[error("Non UTF-8 payload: {0:?}")] + NonUTF8MeasurementPayload(Vec<u8>), + + #[error("Invalid payload: {0}. Expected payload format: <timestamp>:<value>")] + InvalidMeasurementPayloadFormat(String), + + #[error("Invalid measurement timestamp: {0}. Epoch time value expected")] + InvalidMeasurementTimestamp(String), + + #[error("Invalid measurement value: {0}. Must be a number")] + InvalidMeasurementValue(String), +} + +impl CollectdPayload { + fn parse_from(payload: &[u8]) -> Result<Self, CollectdPayloadError> { + let payload = from_utf8(payload) + .map_err(|_err| CollectdPayloadError::NonUTF8MeasurementPayload(payload.into()))?; + let mut iter = payload.split(':'); + + let _timestamp = + iter.next() + .ok_or(CollectdPayloadError::InvalidMeasurementPayloadFormat( + payload.to_string(), + ))?; + let _timestamp = _timestamp.parse::<f64>().map_err(|_err| { + CollectdPayloadError::InvalidMeasurementTimestamp(_timestamp.to_string()) + })?; + + let metric_value = + iter.next() + .ok_or(CollectdPayloadError::InvalidMeasurementPayloadFormat( + payload.to_string(), + ))?; + let metric_value = metric_value + .trim_end_matches(char::from(0)) //Trim \u{0} character from the end of the MQTT payload + .parse::<f64>() + .map_err(|_err| { + CollectdPayloadError::InvalidMeasurementValue(metric_value.to_string()) + })?; + + match iter.next() { + None => Ok(CollectdPayload { + _timestamp, + metric_value, + }), + Some(_) => Err(CollectdPayloadError::InvalidMeasurementPayloadFormat( + payload.to_string(), + )), + } + } +} + +#[cfg(test)] +mod tests { + use assert_matches::assert_matches; + use mqtt_client::Topic; + + use super::*; + + #[test] + fn collectd_message_parsing() { + let topic = Topic::new("collectd/localhost/temperature/value").unwrap(); + let mqtt_message = Message::new(&topic, "123456789:32.5"); + + let collectd_message = CollectdMessage::parse_from(&mqtt_message).unwrap(); + + let CollectdMessage { + metric_group_key, + metric_key, + metric_value, + } = collectd_message; + + assert_eq!(metric_group_key, "temperature"); + assert_eq!(metric_key, "value"); + assert_eq!(metric_value, 32.5); + } + + #[test] + fn invalid_collectd_message_topic() { + let topic = Topic::new("collectd/less/level").unwrap(); + let mqtt_message = Message::new(&topic, "123456789:32.5"); + + let result = CollectdMessage::parse_from(&mqtt_message); + + assert_matches!(result, Err(CollectdError::InvalidMeasurementTopic(_))); + } + + #[test] + fn invalid_collectd_message_payload() { + let topic = Topic::new("collectd/host/group/key").unwrap(); + let invalid_collectd_message = Message::new(&topic, "123456789"); + + let result = CollectdMessage::parse_from(&invalid_collectd_message); + + assert_matches!(result, Err(CollectdError::InvalidMeasurementPayload(_, _))); + } + + #[test] + fn invalid_collectd_topic_less_levels() { + let result = CollectdTopic::from_str("collectd/less/levels"); + + assert_matches!(result, Err(InvalidCollectdTopicName)); + } + + #[test] + fn invalid_collectd_topic_more_levels() { + let result = CollectdTopic::from_str("collectd/more/levels/than/needed"); + + assert_matches!(result, Err(InvalidCollectdTopicName)); + } + + #[test] + fn invalid_collectd_payload_no_seperator() { + let payload: Vec<u8> = "123456789".into(); + let result = CollectdPayload::parse_from(&payload); + + assert_matches!( + result, + Err(CollectdPayloadError::InvalidMeasurementPayloadFormat(_)) + ); + } + + #[test] + fn invalid_collectd_payload_more_seperators() { + let payload: Vec<u8> = "123456789:98.6:abc".into(); + let result = CollectdPayload::parse_from(&payload); + + assert_matches!( + result, + Err(CollectdPayloadError::InvalidMeasurementPayloadFormat(_)) + ); + } + + #[test] + fn invalid_collectd_metric_value() { + let payload: Vec<u8> = "123456789:abc".into(); + let result = CollectdPayload::parse_from(&payload); + + assert_matches!( + result, + Err(CollectdPayloadError::InvalidMeasurementValue(_)) + ); + } + + #[test] + fn invalid_collectd_metric_timestamp() { + let payload: Vec<u8> = "abc:98.6".into(); + let result = CollectdPayload::parse_from(&payload); + + assert_matches!( + result, + Err(CollectdPayloadError::InvalidMeasurementTimestamp(_)) + ); + } + + #[test] + fn very_large_metric_value() { + let payload: Vec<u8> = format!("123456789:{}", u128::MAX).into(); + let collectd_payload = CollectdPayload::parse_from(&payload).unwrap(); + + assert_eq!(collectd_payload.metric_value, u128::MAX as f64); + } + + #[test] + fn very_small_metric_value() { + let payload: Vec<u8> = format!("123456789:{}", i128::MIN).into(); + let collectd_payload = CollectdPayload::parse_from(&payload).unwrap(); + + assert_eq!(collectd_payload.metric_value, i128::MIN as f64); + } +} diff --git a/mapper/dm_agent/src/main.rs b/mapper/dm_agent/src/main.rs new file mode 100644 index 00000000..b1f19827 --- /dev/null +++ b/mapper/dm_agent/src/main.rs @@ -0,0 +1,31 @@ +mod batcher; +mod collectd; +mod monitor; +mod mqtt; + +use tracing::{debug_span, info, Instrument}; + +use crate::monitor::DeviceMonitor; + +const APP_NAME: &str = "tedge-dm-agent"; +const DEFAULT_LOG_LEVEL: &str = "warn"; +const TIME_FORMAT: &str = "%Y-%m-%dT%H:%M:%S%.3f%:z"; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let filter = std::env::var("RUST_LOG").unwrap_or_else(|_| DEFAULT_LOG_LEVEL.into()); + tracing_subscriber::fmt() + .with_timer(tracing_subscriber::fmt::time::ChronoUtc::with_format( + TIME_FORMAT.into(), + )) + .with_env_filter(filter) + .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE) + .init(); + + info!("{} starting!", APP_NAME); + DeviceMonitor::run() + .instrument(debug_span!(APP_NAME)) + .await?; + + Ok(()) +} diff --git a/mapper/dm_agent/src/monitor.rs b/mapper/dm_agent/src/monitor.rs new file mode 100644 index 00000000..5fde4ac8 --- /dev/null +++ b/mapper/dm_agent/src/monitor.rs @@ -0,0 +1,50 @@ +use thin_edge_json::group::MeasurementGrouper; +use tracing::{instrument, log::error}; + +use crate::{ + batcher::{DeviceMonitorError, MessageBatchPublisher, MessageBatcher}, + mqtt::{MqttClient, MqttClientImpl}, +}; + +const DEFAULT_HOST: &str = "localhost"; +const DEFAULT_PORT: u16 = 1883; +const CLIENT_ID: &str = "tedge-dm-agent"; + +#[derive(Debug)] +pub struct DeviceMonitor; + +impl DeviceMonitor { + #[instrument(name = "monitor")] + pub async fn run() -> Result<(), DeviceMonitorError> { + let config = mqtt_client::Config::new(DEFAULT_HOST, DEFAULT_PORT).queue_capacity(1024); + let mqtt_client = MqttClientImpl::connect(CLIENT_ID, &config).await?; + + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<MeasurementGrouper>(); + + let message_batch_producer = MessageBatcher::new(sender, mqtt_client.clone())?; + let join_handle1 = tokio::task::spawn(async move { + match message_batch_producer.run().await { + Ok(_) => error!("Unexpected end of message batcher thread"), + Err(err) => error!("Error in message batcher thread: {}", err), + } + }); + + let mut message_batch_consumer = MessageBatchPublisher::new(receiver, mqtt_client.clone())?; + let join_handle2 = tokio::task::spawn(async move { + message_batch_consumer.run().await; + }); + + let mut errors = mqtt_client.subscribe_errors(); + let join_handle3 = tokio::task::spawn(async move { + while let Some(error) = errors.next().await { + error!("MQTT error: {}", error); + } + }); + + let _ = join_handle1.await; + let _ = join_handle2.await; + let _ = join_handle3.await; + + Ok(()) + } +} diff --git a/mapper/dm_agent/src/mqtt.rs b/mapper/dm_agent/src/mqtt.rs new file mode 100644 index 00000000..0d18be41 --- /dev/null +++ b/mapper/dm_agent/src/mqtt.rs @@ -0,0 +1,86 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use mockall::automock; +use mqtt_client::{Client, ErrorStream, Message, MessageId, MessageStream, TopicFilter}; + +#[automock] +#[async_trait] +pub trait MqttClient: Send + Sync { + fn subscribe_errors(&self) -> Box<dyn MqttErrorStream>; + + async fn subscribe( + &self, + filter: TopicFilter, + ) -> Result<Box<dyn MqttMessageStream>, mqtt_client::Error>; + + async fn publish(&self, message: Message) -> Result<MessageId, mqtt_client::Error>; +} + +#[async_trait] +#[automock] +pub trait MqttMessageStream: Send + Sync { + async fn next(&mut self) -> Option<Message>; +} + +pub struct MqttMessageStreamImpl { + message_stream: MessageStream, +} + +#[async_trait] +impl MqttMessageStream for MqttMessageStreamImpl { + async fn next(&mut self) -> Option<Message> { + self.message_stream.next().await + } +} + +#[automock] +#[async_trait] +pub trait MqttErrorStream: Send + Sync { + async fn next(&mut self) -> Option<Arc<mqtt_client::Error>>; |