summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_api/examples
diff options
context:
space:
mode:
authorMarcel Müller <m.mueller@ifm.com>2022-05-16 10:25:36 +0200
committerMatthias Beyer <matthias.beyer@ifm.com>2022-05-20 12:16:12 +0200
commit43ed68145e7044c0d8f170af794d71c2fa93599a (patch)
tree7816fa9bfdf8ab17d8098386637389f932e895bc /crates/core/tedge_api/examples
parent02272e24a128f273416c92cd7c00e2b97ca19a02 (diff)
Replace tokio MPSC with a direct Future
MPSC are a heavy form of indirection that is potentially not needed. This patch removes them and instead opts to use a Fn Generator of futures that correspond to the model of communication One Question -> One Reponse. Signed-off-by: Marcel Müller <m.mueller@ifm.com> Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com> Reviewed-by: Matthias Beyer <matthias.beyer@ifm.com>
Diffstat (limited to 'crates/core/tedge_api/examples')
-rw-r--r--crates/core/tedge_api/examples/heartbeat.rs121
-rw-r--r--crates/core/tedge_api/examples/universal_log.rs123
2 files changed, 132 insertions, 112 deletions
diff --git a/crates/core/tedge_api/examples/heartbeat.rs b/crates/core/tedge_api/examples/heartbeat.rs
index 37e9c8ec..654c4311 100644
--- a/crates/core/tedge_api/examples/heartbeat.rs
+++ b/crates/core/tedge_api/examples/heartbeat.rs
@@ -1,4 +1,4 @@
-use std::{collections::HashMap, time::Duration};
+use std::{collections::HashMap, sync::Arc, time::Duration};
use async_trait::async_trait;
use futures::FutureExt;
@@ -9,6 +9,7 @@ use tedge_api::{
Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory,
PluginError,
};
+use tokio::sync::RwLock;
/// A message that represents a heartbeat that gets sent to plugins
#[derive(Debug)]
@@ -294,11 +295,17 @@ impl Plugin for CriticalService {
//
/// Helper type for keeping information about plugins during runtime
-#[derive(Debug)]
struct PluginInfo {
types: Vec<MessageType>,
- receiver: Option<tedge_api::address::MessageReceiver>,
- sender: tedge_api::address::MessageSender,
+ sender: Arc<RwLock<Option<Box<tedge_api::address::MessageFutureProducer>>>>,
+}
+
+impl std::fmt::Debug for PluginInfo {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("PluginInfo")
+ .field("types", &self.types)
+ .finish_non_exhaustive()
+ }
}
/// The type that provides the communication infrastructure to the plugins.
@@ -309,13 +316,12 @@ struct Communication {
impl Communication {
pub fn declare_plugin<PB: PluginBuilder<Self>>(&mut self, name: &str) {
- let (sender, receiver) = tokio::sync::mpsc::channel(10);
+ let sender = Arc::new(RwLock::new(None));
self.plugins.insert(
name.to_owned(),
PluginInfo {
types: PB::kind_message_types().into_types(),
sender,
- receiver: Some(receiver),
},
);
}
@@ -349,7 +355,9 @@ impl PluginDirectory for Communication {
asked_types, name, plug.types,
);
} else {
- Ok(Address::new(plug.sender.clone()))
+ Ok(Address::new(tedge_api::address::InnerMessageSender::new(
+ plug.sender.clone(),
+ )))
}
}
@@ -412,59 +420,55 @@ async fn main() {
let cancel_token = CancellationToken::new();
- let mut heartbeat = build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await;
- let mut critical_service = build_critical_plugin(&mut comms, cancel_token.child_token()).await;
+ let mut heartbeat = Arc::new(RwLock::new(
+ build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await,
+ ));
+ let mut critical_service = Arc::new(RwLock::new(
+ build_critical_plugin(&mut comms, cancel_token.child_token()).await,
+ ));
- heartbeat.plugin_mut().start().await.unwrap();
- critical_service.plugin_mut().start().await.unwrap();
-
- let mut recv = comms
- .plugins
- .get_mut("heartbeat")
- .unwrap()
- .receiver
- .take()
+ heartbeat.write().await.plugin_mut().start().await.unwrap();
+ critical_service
+ .write()
+ .await
+ .plugin_mut()
+ .start()
+ .await
.unwrap();
- let hb_cancel_token = cancel_token.child_token();
- let hb_handle = tokio::task::spawn(async move {
- let hb = heartbeat;
+ let recv = comms.plugins.get("heartbeat").unwrap();
- loop {
- tokio::select! {
- Some(msg) = recv.recv() => {
- hb.handle_message(msg).await.unwrap();
- }
- _ = hb_cancel_token.cancelled() => break,
+ {
+ let mut lock = recv.sender.write().await;
+ let heartbeat = heartbeat.clone();
+
+ *lock = Some(Box::new(move |msg, _wait_kind| {
+ let heartbeat = heartbeat.clone();
+ async move {
+ let heartbeat = heartbeat.read().await;
+ heartbeat.handle_message(msg).await;
+ Ok(())
}
- }
-
- hb
- });
-
- let mut recv = comms
- .plugins
- .get_mut("critical-service")
- .unwrap()
- .receiver
- .take()
- .unwrap();
+ .boxed()
+ }));
+ }
- let cs_cancel_token = cancel_token.child_token();
- let cs_handle = tokio::task::spawn(async move {
- let cs = critical_service;
+ let recv = comms.plugins.get("critical-service").unwrap();
- loop {
- tokio::select! {
- Some(msg) = recv.recv() => {
- cs.handle_message(msg).await.unwrap();
- }
- _ = cs_cancel_token.cancelled() => break,
+ {
+ let mut lock = recv.sender.write().await;
+ let critical_service = critical_service.clone();
+
+ *lock = Some(Box::new(move |msg, _wait_kind| {
+ let critical_service = critical_service.clone();
+ async move {
+ let critical_service = critical_service.read().await;
+ critical_service.handle_message(msg).await;
+ Ok(())
}
- }
-
- cs
- });
+ .boxed()
+ }));
+ }
println!("Core: Stopping everything in 10 seconds!");
tokio::time::sleep(Duration::from_secs(12)).await;
@@ -472,11 +476,16 @@ async fn main() {
println!("Core: SHUTTING DOWN");
cancel_token.cancel();
- let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle);
-
- heartbeat.unwrap().plugin_mut().shutdown().await.unwrap();
+ heartbeat
+ .write()
+ .await
+ .plugin_mut()
+ .shutdown()
+ .await
+ .unwrap();
critical_service
- .unwrap()
+ .write()
+ .await
.plugin_mut()
.shutdown()
.await
diff --git a/crates/core/tedge_api/examples/universal_log.rs b/crates/core/tedge_api/examples/universal_log.rs
index f2b3dcea..82ff3130 100644
--- a/crates/core/tedge_api/examples/universal_log.rs
+++ b/crates/core/tedge_api/examples/universal_log.rs
@@ -1,6 +1,7 @@
-use std::{collections::HashMap, time::Duration};
+use std::{collections::HashMap, sync::Arc, time::Duration};
use async_trait::async_trait;
+use futures::FutureExt;
use tedge_api::{
address::ReplySenderFor,
message::{AnyMessage, MessageType},
@@ -8,6 +9,7 @@ use tedge_api::{
Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory,
PluginError,
};
+use tokio::sync::RwLock;
/// A message that represents a heartbeat that gets sent to plugins
#[derive(Debug)]
@@ -229,6 +231,7 @@ impl Plugin for LogService {
}
}
+//
// The following pieces of code would be implemented by a "core" component, that is responsible for
// setting up plugins and their communication.
//
@@ -238,11 +241,17 @@ impl Plugin for LogService {
//
/// Helper type for keeping information about plugins during runtime
-#[derive(Debug)]
struct PluginInfo {
types: Vec<MessageType>,
- receiver: Option<tedge_api::address::MessageReceiver>,
- sender: tedge_api::address::MessageSender,
+ sender: Arc<RwLock<Option<Box<tedge_api::address::MessageFutureProducer>>>>,
+}
+
+impl std::fmt::Debug for PluginInfo {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("PluginInfo")
+ .field("types", &self.types)
+ .finish_non_exhaustive()
+ }
}
/// The type that provides the communication infrastructure to the plugins.
@@ -253,13 +262,12 @@ struct Communication {
impl Communication {
pub fn declare_plugin<PB: PluginBuilder<Self>>(&mut self, name: &str) {
- let (sender, receiver) = tokio::sync::mpsc::channel(10);
+ let sender = Arc::new(RwLock::new(None));
self.plugins.insert(
name.to_owned(),
PluginInfo {
types: PB::kind_message_types().into_types(),
sender,
- receiver: Some(receiver),
},
);
}
@@ -293,7 +301,9 @@ impl PluginDirectory for Communication {
asked_types, name, plug.types,
);
} else {
- Ok(Address::new(plug.sender.clone()))
+ Ok(Address::new(tedge_api::address::InnerMessageSender::new(
+ plug.sender.clone(),
+ )))
}
}
@@ -356,59 +366,55 @@ async fn main() {
let cancel_token = CancellationToken::new();
- let mut heartbeat = build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await;
- let mut critical_service = build_critical_plugin(&mut comms, cancel_token.child_token()).await;
-
- heartbeat.plugin_mut().start().await.unwrap();
- critical_service.plugin_mut().start().await.unwrap();
+ let mut heartbeat = Arc::new(RwLock::new(
+ build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await,
+ ));
+ let mut critical_service = Arc::new(RwLock::new(
+ build_critical_plugin(&mut comms, cancel_token.child_token()).await,
+ ));
- let mut recv = comms
- .plugins
- .get_mut("heartbeat")
- .unwrap()
- .receiver
- .take()
+ heartbeat.write().await.plugin_mut().start().await.unwrap();
+ critical_service
+ .write()
+ .await
+ .plugin_mut()
+ .start()
+ .await
.unwrap();
- let hb_cancel_token = cancel_token.child_token();
- let hb_handle = tokio::task::spawn(async move {
- let hb = heartbeat;
+ let recv = comms.plugins.get("heartbeat").unwrap();
- loop {
- tokio::select! {
- Some(msg) = recv.recv() => {
- hb.handle_message(msg).await.unwrap();
- }
- _ = hb_cancel_token.cancelled() => break,
+ {
+ let mut lock = recv.sender.write().await;
+ let heartbeat = heartbeat.clone();
+
+ *lock = Some(Box::new(move |msg, _wait_kind| {
+ let heartbeat = heartbeat.clone();
+ async move {
+ let heartbeat = heartbeat.read().await;
+ heartbeat.handle_message(msg).await;
+ Ok(())
}
- }
-
- hb
- });
-
- let mut recv = comms
- .plugins
- .get_mut("critical-service")
- .unwrap()
- .receiver
- .take()
- .unwrap();
+ .boxed()
+ }));
+ }
- let cs_cancel_token = cancel_token.child_token();
- let cs_handle = tokio::task::spawn(async move {
- let cs = critical_service;
+ let recv = comms.plugins.get("critical-service").unwrap();
- loop {
- tokio::select! {
- Some(msg) = recv.recv() => {
- cs.handle_message(msg).await.unwrap();
- }
- _ = cs_cancel_token.cancelled() => break,
+ {
+ let mut lock = recv.sender.write().await;
+ let critical_service = critical_service.clone();
+
+ *lock = Some(Box::new(move |msg, _wait_kind| {
+ let critical_service = critical_service.clone();
+ async move {
+ let critical_service = critical_service.read().await;
+ critical_service.handle_message(msg).await;
+ Ok(())
}
- }
-
- cs
- });
+ .boxed()
+ }));
+ }
println!("Core: Stopping everything in 10 seconds!");
tokio::time::sleep(Duration::from_secs(12)).await;
@@ -416,11 +422,16 @@ async fn main() {
println!("Core: SHUTTING DOWN");
cancel_token.cancel();
- let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle);
-
- heartbeat.unwrap().plugin_mut().shutdown().await.unwrap();
+ heartbeat
+ .write()
+ .await
+ .plugin_mut()
+ .shutdown()
+ .await
+ .unwrap();
critical_service
- .unwrap()
+ .write()
+ .await
.plugin_mut()
.shutdown()
.await