diff options
author | Marcel Müller <m.mueller@ifm.com> | 2022-04-19 19:26:37 +0200 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@ifm.com> | 2022-04-22 08:42:39 +0200 |
commit | 356725a22c29035396e0bcbf5e89ff49839e1e56 (patch) | |
tree | 05bd2f2e1ca9e094b830ce9b49739496fa93cb90 | |
parent | 6795e4afbc0c331e7dde453e03f9ae0286767d54 (diff) |
Support receiving messages from the cloud
Signed-off-by: Marcel Müller <m.mueller@ifm.com>
-rw-r--r-- | Cargo.lock | 61 | ||||
-rw-r--r-- | plugins/plugin_azure_bridge/src/lib.rs | 101 |
2 files changed, 118 insertions, 44 deletions
@@ -2031,19 +2031,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" [[package]] -name = "openssl-sys" -version = "0.9.72" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e46109c383602735fa0a2e48dd2b7c892b048e1bf69e5c3b1d804b7d9c203cb" -dependencies = [ - "autocfg", - "cc", - "libc", - "pkg-config", - "vcpkg", -] - -[[package]] name = "openssl" version = "0.10.38" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2064,12 +2051,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] +name = "openssl-sys" +version = "0.9.72" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e46109c383602735fa0a2e48dd2b7c892b048e1bf69e5c3b1d804b7d9c203cb" +dependencies = [ + "autocfg", + "cc", + "libc", + "pkg-config", + "vcpkg", +] + +[[package]] name = "os_str_bytes" version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64" dependencies = [ -"memchr", + "memchr", ] [[package]] @@ -2078,10 +2078,16 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53cdc5b785b7a58c5aad8216b3dfa114df64b0b06ae6e1501cef91df2fbdf8f9" dependencies = [ -"winapi", + "winapi", ] [[package]] +name = "owo-colors" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e72e30578e0d0993c8ae20823dd9cff2bc5517d2f586a8aef462a581e8a03eb" + +[[package]] name = "paho-mqtt" version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2108,12 +2114,6 @@ dependencies = [ ] [[package]] -name = "owo-colors" -version = "3.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e72e30578e0d0993c8ae20823dd9cff2bc5517d2f586a8aef462a581e8a03eb" - -[[package]] name = "parking_lot" version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3340,24 +3340,6 @@ dependencies = [ ] [[package]] -name = "supports-hyperlinks" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "590b34f7c5f01ecc9d78dba4b3f445f31df750a67621cf31626f3b7441ce6406" -dependencies = [ - "atty", -] - -[[package]] -name = "supports-unicode" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8b945e45b417b125a8ec51f1b7df2f8df7920367700d1f98aedd21e5735f8b2" -dependencies = [ - "atty", -] - -[[package]] name = "subtle" version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3473,7 +3455,6 @@ dependencies = [ "env_logger 0.9.0", "miette", "plugin_avg", - "plugin_azure_bridge", "plugin_httpstop", "plugin_inotify", "plugin_log", diff --git a/plugins/plugin_azure_bridge/src/lib.rs b/plugins/plugin_azure_bridge/src/lib.rs index 117f4093..73d15337 100644 --- a/plugins/plugin_azure_bridge/src/lib.rs +++ b/plugins/plugin_azure_bridge/src/lib.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use async_trait::async_trait; use azure_iot_sdk::DeviceKeyTokenSource; @@ -5,11 +7,14 @@ use azure_iot_sdk::IoTHubClient; use azure_iot_sdk::Message as IoTMessage; use miette::IntoDiagnostic; use tedge_api::address::ReplySender; +use tedge_api::make_receiver_bundle; +use tedge_api::message::NoReply; use tedge_api::plugin::BuiltPlugin; use tedge_api::plugin::Handle; use tedge_api::plugin::HandleTypes; use tedge_api::plugin::Message; use tedge_api::plugin::PluginDeclaration; +use tedge_api::Address; use tedge_api::Plugin; use tedge_api::PluginBuilder; use tedge_api::PluginConfiguration; @@ -28,6 +33,8 @@ struct AzureConfig { access_key: String, hubname: String, device_id: String, + #[serde(default)] + c2d_target: Option<String>, } #[async_trait] @@ -54,7 +61,7 @@ impl<PD: PluginDirectory> PluginBuilder<PD> for AzureBridgeBuilder { &self, config: PluginConfiguration, cancellation_token: CancellationToken, - _core_comms: &PD, + core_comms: &PD, ) -> Result<BuiltPlugin, PluginError> where PD: 'async_trait, @@ -63,6 +70,7 @@ impl<PD: PluginDirectory> PluginBuilder<PD> for AzureBridgeBuilder { access_key, hubname, device_id, + c2d_target: c2d_receiver, } = config.try_into().into_diagnostic()?; let token_source = DeviceKeyTokenSource::new(&hubname, &device_id, &access_key) @@ -72,17 +80,29 @@ impl<PD: PluginDirectory> PluginBuilder<PD> for AzureBridgeBuilder { .await .map_err(|e| miette::Report::msg(e.to_string()))?; + let c2d_receiver = c2d_receiver + .map(|name| core_comms.get_address_for(&name)) + .transpose()?; + Ok(AzureBridge { - az_client: Mutex::new(az_client), - cancellation_token, + state: Arc::new(State { + az_client: Mutex::new(az_client), + cancellation_token, + c2d_receiver, + }), } .finish()) } } struct AzureBridge { + state: Arc<State>, +} + +struct State { az_client: Mutex<IoTHubClient>, cancellation_token: CancellationToken, + c2d_receiver: Option<Address<Cloud2DeviceMessageReceiver>>, } impl PluginDeclaration for AzureBridge { @@ -92,6 +112,29 @@ impl PluginDeclaration for AzureBridge { #[async_trait] impl Plugin for AzureBridge { async fn start(&mut self) -> Result<(), PluginError> { + let mut client = self.state.az_client.lock().await; + let mut receiver = client.get_receiver().await; + + let cancel = self.state.cancellation_token.child_token(); + let state = self.state.clone(); + tokio::spawn(async move { + loop { + tokio::select! { + message = receiver.recv() => { + match message { + None => break, + Some(message) => { + tokio::spawn(handle_message(message, state.clone())); + } + } + } + _ = cancel.cancelled() => { + break; + } + } + } + }); + Ok(()) } @@ -100,6 +143,56 @@ impl Plugin for AzureBridge { } } +#[derive(Debug)] +pub struct Cloud2DeviceMessage { + pub data: Vec<u8>, +} + +impl Message for Cloud2DeviceMessage { + type Reply = NoReply; +} + +make_receiver_bundle!(struct Cloud2DeviceMessageReceiver(Cloud2DeviceMessage)); + +impl Cloud2DeviceMessage { + /// Get a reference to the cloud2 device message's data. + #[must_use] + pub fn data(&self) -> &[u8] { + self.data.as_ref() + } +} + +async fn handle_message(message: azure_iot_sdk::MessageType, client: Arc<State>) { + match message { + azure_iot_sdk::MessageType::C2DMessage(cloud_message) => { + tracing::event!( + tracing::Level::TRACE, + "Received Cloud2Device message of {} bytes", + cloud_message.body.len() + ); + + if let Some(addr) = client.c2d_receiver.as_ref() { + tracing::trace!("Sending cloud message to target"); + if let Err(_) = addr + .send(Cloud2DeviceMessage { + data: cloud_message.body, + }) + .await + { + tracing::error!( + "Could not forward Cloud2Device message as the recipient has disappeared!" + ); + } + } else { + tracing::warn!("Could not forward Cloud2Device message as no recipient has been set. Discarding the message."); + } + } + azure_iot_sdk::MessageType::DesiredPropertyUpdate(_) => (), + azure_iot_sdk::MessageType::DirectMethod(_) => (), + azure_iot_sdk::MessageType::ErrorReceive(err) => tracing::warn!("Received an error: {:?}", err), + } +} + #[async_trait] impl Handle<Measurement> for AzureBridge { async fn handle_message( @@ -107,7 +200,7 @@ impl Handle<Measurement> for AzureBridge { message: Measurement, _sender: ReplySender<<Measurement as Message>::Reply>, ) -> Result<(), PluginError> { - let mut client = self.az_client.lock().await; + let mut client = self.state.az_client.lock().await; let message = IoTMessage::builder() .set_body(serde_json::to_vec(&message).into_diagnostic()?) |