summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMarcel Müller <m.mueller@ifm.com>2022-04-19 19:26:37 +0200
committerMatthias Beyer <matthias.beyer@ifm.com>2022-04-22 08:42:39 +0200
commit356725a22c29035396e0bcbf5e89ff49839e1e56 (patch)
tree05bd2f2e1ca9e094b830ce9b49739496fa93cb90
parent6795e4afbc0c331e7dde453e03f9ae0286767d54 (diff)
Support receiving messages from the cloud
Signed-off-by: Marcel Müller <m.mueller@ifm.com>
-rw-r--r--Cargo.lock61
-rw-r--r--plugins/plugin_azure_bridge/src/lib.rs101
2 files changed, 118 insertions, 44 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 1f1f5338..6efdbf40 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2031,19 +2031,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
-name = "openssl-sys"
-version = "0.9.72"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7e46109c383602735fa0a2e48dd2b7c892b048e1bf69e5c3b1d804b7d9c203cb"
-dependencies = [
- "autocfg",
- "cc",
- "libc",
- "pkg-config",
- "vcpkg",
-]
-
-[[package]]
name = "openssl"
version = "0.10.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2064,12 +2051,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
+name = "openssl-sys"
+version = "0.9.72"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7e46109c383602735fa0a2e48dd2b7c892b048e1bf69e5c3b1d804b7d9c203cb"
+dependencies = [
+ "autocfg",
+ "cc",
+ "libc",
+ "pkg-config",
+ "vcpkg",
+]
+
+[[package]]
name = "os_str_bytes"
version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64"
dependencies = [
-"memchr",
+ "memchr",
]
[[package]]
@@ -2078,10 +2078,16 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53cdc5b785b7a58c5aad8216b3dfa114df64b0b06ae6e1501cef91df2fbdf8f9"
dependencies = [
-"winapi",
+ "winapi",
]
[[package]]
+name = "owo-colors"
+version = "3.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5e72e30578e0d0993c8ae20823dd9cff2bc5517d2f586a8aef462a581e8a03eb"
+
+[[package]]
name = "paho-mqtt"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2108,12 +2114,6 @@ dependencies = [
]
[[package]]
-name = "owo-colors"
-version = "3.3.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5e72e30578e0d0993c8ae20823dd9cff2bc5517d2f586a8aef462a581e8a03eb"
-
-[[package]]
name = "parking_lot"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3340,24 +3340,6 @@ dependencies = [
]
[[package]]
-name = "supports-hyperlinks"
-version = "1.2.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "590b34f7c5f01ecc9d78dba4b3f445f31df750a67621cf31626f3b7441ce6406"
-dependencies = [
- "atty",
-]
-
-[[package]]
-name = "supports-unicode"
-version = "1.0.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a8b945e45b417b125a8ec51f1b7df2f8df7920367700d1f98aedd21e5735f8b2"
-dependencies = [
- "atty",
-]
-
-[[package]]
name = "subtle"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3473,7 +3455,6 @@ dependencies = [
"env_logger 0.9.0",
"miette",
"plugin_avg",
- "plugin_azure_bridge",
"plugin_httpstop",
"plugin_inotify",
"plugin_log",
diff --git a/plugins/plugin_azure_bridge/src/lib.rs b/plugins/plugin_azure_bridge/src/lib.rs
index 117f4093..73d15337 100644
--- a/plugins/plugin_azure_bridge/src/lib.rs
+++ b/plugins/plugin_azure_bridge/src/lib.rs
@@ -1,3 +1,5 @@
+use std::sync::Arc;
+
use async_trait::async_trait;
use azure_iot_sdk::DeviceKeyTokenSource;
@@ -5,11 +7,14 @@ use azure_iot_sdk::IoTHubClient;
use azure_iot_sdk::Message as IoTMessage;
use miette::IntoDiagnostic;
use tedge_api::address::ReplySender;
+use tedge_api::make_receiver_bundle;
+use tedge_api::message::NoReply;
use tedge_api::plugin::BuiltPlugin;
use tedge_api::plugin::Handle;
use tedge_api::plugin::HandleTypes;
use tedge_api::plugin::Message;
use tedge_api::plugin::PluginDeclaration;
+use tedge_api::Address;
use tedge_api::Plugin;
use tedge_api::PluginBuilder;
use tedge_api::PluginConfiguration;
@@ -28,6 +33,8 @@ struct AzureConfig {
access_key: String,
hubname: String,
device_id: String,
+ #[serde(default)]
+ c2d_target: Option<String>,
}
#[async_trait]
@@ -54,7 +61,7 @@ impl<PD: PluginDirectory> PluginBuilder<PD> for AzureBridgeBuilder {
&self,
config: PluginConfiguration,
cancellation_token: CancellationToken,
- _core_comms: &PD,
+ core_comms: &PD,
) -> Result<BuiltPlugin, PluginError>
where
PD: 'async_trait,
@@ -63,6 +70,7 @@ impl<PD: PluginDirectory> PluginBuilder<PD> for AzureBridgeBuilder {
access_key,
hubname,
device_id,
+ c2d_target: c2d_receiver,
} = config.try_into().into_diagnostic()?;
let token_source = DeviceKeyTokenSource::new(&hubname, &device_id, &access_key)
@@ -72,17 +80,29 @@ impl<PD: PluginDirectory> PluginBuilder<PD> for AzureBridgeBuilder {
.await
.map_err(|e| miette::Report::msg(e.to_string()))?;
+ let c2d_receiver = c2d_receiver
+ .map(|name| core_comms.get_address_for(&name))
+ .transpose()?;
+
Ok(AzureBridge {
- az_client: Mutex::new(az_client),
- cancellation_token,
+ state: Arc::new(State {
+ az_client: Mutex::new(az_client),
+ cancellation_token,
+ c2d_receiver,
+ }),
}
.finish())
}
}
struct AzureBridge {
+ state: Arc<State>,
+}
+
+struct State {
az_client: Mutex<IoTHubClient>,
cancellation_token: CancellationToken,
+ c2d_receiver: Option<Address<Cloud2DeviceMessageReceiver>>,
}
impl PluginDeclaration for AzureBridge {
@@ -92,6 +112,29 @@ impl PluginDeclaration for AzureBridge {
#[async_trait]
impl Plugin for AzureBridge {
async fn start(&mut self) -> Result<(), PluginError> {
+ let mut client = self.state.az_client.lock().await;
+ let mut receiver = client.get_receiver().await;
+
+ let cancel = self.state.cancellation_token.child_token();
+ let state = self.state.clone();
+ tokio::spawn(async move {
+ loop {
+ tokio::select! {
+ message = receiver.recv() => {
+ match message {
+ None => break,
+ Some(message) => {
+ tokio::spawn(handle_message(message, state.clone()));
+ }
+ }
+ }
+ _ = cancel.cancelled() => {
+ break;
+ }
+ }
+ }
+ });
+
Ok(())
}
@@ -100,6 +143,56 @@ impl Plugin for AzureBridge {
}
}
+#[derive(Debug)]
+pub struct Cloud2DeviceMessage {
+ pub data: Vec<u8>,
+}
+
+impl Message for Cloud2DeviceMessage {
+ type Reply = NoReply;
+}
+
+make_receiver_bundle!(struct Cloud2DeviceMessageReceiver(Cloud2DeviceMessage));
+
+impl Cloud2DeviceMessage {
+ /// Get a reference to the cloud2 device message's data.
+ #[must_use]
+ pub fn data(&self) -> &[u8] {
+ self.data.as_ref()
+ }
+}
+
+async fn handle_message(message: azure_iot_sdk::MessageType, client: Arc<State>) {
+ match message {
+ azure_iot_sdk::MessageType::C2DMessage(cloud_message) => {
+ tracing::event!(
+ tracing::Level::TRACE,
+ "Received Cloud2Device message of {} bytes",
+ cloud_message.body.len()
+ );
+
+ if let Some(addr) = client.c2d_receiver.as_ref() {
+ tracing::trace!("Sending cloud message to target");
+ if let Err(_) = addr
+ .send(Cloud2DeviceMessage {
+ data: cloud_message.body,
+ })
+ .await
+ {
+ tracing::error!(
+ "Could not forward Cloud2Device message as the recipient has disappeared!"
+ );
+ }
+ } else {
+ tracing::warn!("Could not forward Cloud2Device message as no recipient has been set. Discarding the message.");
+ }
+ }
+ azure_iot_sdk::MessageType::DesiredPropertyUpdate(_) => (),
+ azure_iot_sdk::MessageType::DirectMethod(_) => (),
+ azure_iot_sdk::MessageType::ErrorReceive(err) => tracing::warn!("Received an error: {:?}", err),
+ }
+}
+
#[async_trait]
impl Handle<Measurement> for AzureBridge {
async fn handle_message(
@@ -107,7 +200,7 @@ impl Handle<Measurement> for AzureBridge {
message: Measurement,
_sender: ReplySender<<Measurement as Message>::Reply>,
) -> Result<(), PluginError> {
- let mut client = self.az_client.lock().await;
+ let mut client = self.state.az_client.lock().await;
let message = IoTMessage::builder()
.set_body(serde_json::to_vec(&message).into_diagnostic()?)