diff options
author | Marcel Müller <m.mueller@ifm.com> | 2022-05-16 10:25:36 +0200 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@ifm.com> | 2022-05-20 12:16:12 +0200 |
commit | 43ed68145e7044c0d8f170af794d71c2fa93599a (patch) | |
tree | 7816fa9bfdf8ab17d8098386637389f932e895bc /crates/core/tedge_api | |
parent | 02272e24a128f273416c92cd7c00e2b97ca19a02 (diff) |
Replace tokio MPSC with a direct Future
MPSC are a heavy form of indirection that is potentially not needed.
This patch removes them and instead opts to use a Fn Generator of
futures that correspond to the model of communication One Question ->
One Reponse.
Signed-off-by: Marcel Müller <m.mueller@ifm.com>
Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
Reviewed-by: Matthias Beyer <matthias.beyer@ifm.com>
Diffstat (limited to 'crates/core/tedge_api')
-rw-r--r-- | crates/core/tedge_api/examples/heartbeat.rs | 121 | ||||
-rw-r--r-- | crates/core/tedge_api/examples/universal_log.rs | 123 | ||||
-rw-r--r-- | crates/core/tedge_api/src/address.rs | 99 |
3 files changed, 214 insertions, 129 deletions
diff --git a/crates/core/tedge_api/examples/heartbeat.rs b/crates/core/tedge_api/examples/heartbeat.rs index 37e9c8ec..654c4311 100644 --- a/crates/core/tedge_api/examples/heartbeat.rs +++ b/crates/core/tedge_api/examples/heartbeat.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use async_trait::async_trait; use futures::FutureExt; @@ -9,6 +9,7 @@ use tedge_api::{ Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory, PluginError, }; +use tokio::sync::RwLock; /// A message that represents a heartbeat that gets sent to plugins #[derive(Debug)] @@ -294,11 +295,17 @@ impl Plugin for CriticalService { // /// Helper type for keeping information about plugins during runtime -#[derive(Debug)] struct PluginInfo { types: Vec<MessageType>, - receiver: Option<tedge_api::address::MessageReceiver>, - sender: tedge_api::address::MessageSender, + sender: Arc<RwLock<Option<Box<tedge_api::address::MessageFutureProducer>>>>, +} + +impl std::fmt::Debug for PluginInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PluginInfo") + .field("types", &self.types) + .finish_non_exhaustive() + } } /// The type that provides the communication infrastructure to the plugins. @@ -309,13 +316,12 @@ struct Communication { impl Communication { pub fn declare_plugin<PB: PluginBuilder<Self>>(&mut self, name: &str) { - let (sender, receiver) = tokio::sync::mpsc::channel(10); + let sender = Arc::new(RwLock::new(None)); self.plugins.insert( name.to_owned(), PluginInfo { types: PB::kind_message_types().into_types(), sender, - receiver: Some(receiver), }, ); } @@ -349,7 +355,9 @@ impl PluginDirectory for Communication { asked_types, name, plug.types, ); } else { - Ok(Address::new(plug.sender.clone())) + Ok(Address::new(tedge_api::address::InnerMessageSender::new( + plug.sender.clone(), + ))) } } @@ -412,59 +420,55 @@ async fn main() { let cancel_token = CancellationToken::new(); - let mut heartbeat = build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await; - let mut critical_service = build_critical_plugin(&mut comms, cancel_token.child_token()).await; + let mut heartbeat = Arc::new(RwLock::new( + build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await, + )); + let mut critical_service = Arc::new(RwLock::new( + build_critical_plugin(&mut comms, cancel_token.child_token()).await, + )); - heartbeat.plugin_mut().start().await.unwrap(); - critical_service.plugin_mut().start().await.unwrap(); - - let mut recv = comms - .plugins - .get_mut("heartbeat") - .unwrap() - .receiver - .take() + heartbeat.write().await.plugin_mut().start().await.unwrap(); + critical_service + .write() + .await + .plugin_mut() + .start() + .await .unwrap(); - let hb_cancel_token = cancel_token.child_token(); - let hb_handle = tokio::task::spawn(async move { - let hb = heartbeat; + let recv = comms.plugins.get("heartbeat").unwrap(); - loop { - tokio::select! { - Some(msg) = recv.recv() => { - hb.handle_message(msg).await.unwrap(); - } - _ = hb_cancel_token.cancelled() => break, + { + let mut lock = recv.sender.write().await; + let heartbeat = heartbeat.clone(); + + *lock = Some(Box::new(move |msg, _wait_kind| { + let heartbeat = heartbeat.clone(); + async move { + let heartbeat = heartbeat.read().await; + heartbeat.handle_message(msg).await; + Ok(()) } - } - - hb - }); - - let mut recv = comms - .plugins - .get_mut("critical-service") - .unwrap() - .receiver - .take() - .unwrap(); + .boxed() + })); + } - let cs_cancel_token = cancel_token.child_token(); - let cs_handle = tokio::task::spawn(async move { - let cs = critical_service; + let recv = comms.plugins.get("critical-service").unwrap(); - loop { - tokio::select! { - Some(msg) = recv.recv() => { - cs.handle_message(msg).await.unwrap(); - } - _ = cs_cancel_token.cancelled() => break, + { + let mut lock = recv.sender.write().await; + let critical_service = critical_service.clone(); + + *lock = Some(Box::new(move |msg, _wait_kind| { + let critical_service = critical_service.clone(); + async move { + let critical_service = critical_service.read().await; + critical_service.handle_message(msg).await; + Ok(()) } - } - - cs - }); + .boxed() + })); + } println!("Core: Stopping everything in 10 seconds!"); tokio::time::sleep(Duration::from_secs(12)).await; @@ -472,11 +476,16 @@ async fn main() { println!("Core: SHUTTING DOWN"); cancel_token.cancel(); - let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle); - - heartbeat.unwrap().plugin_mut().shutdown().await.unwrap(); + heartbeat + .write() + .await + .plugin_mut() + .shutdown() + .await + .unwrap(); critical_service - .unwrap() + .write() + .await .plugin_mut() .shutdown() .await diff --git a/crates/core/tedge_api/examples/universal_log.rs b/crates/core/tedge_api/examples/universal_log.rs index f2b3dcea..82ff3130 100644 --- a/crates/core/tedge_api/examples/universal_log.rs +++ b/crates/core/tedge_api/examples/universal_log.rs @@ -1,6 +1,7 @@ -use std::{collections::HashMap, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use async_trait::async_trait; +use futures::FutureExt; use tedge_api::{ address::ReplySenderFor, message::{AnyMessage, MessageType}, @@ -8,6 +9,7 @@ use tedge_api::{ Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory, PluginError, }; +use tokio::sync::RwLock; /// A message that represents a heartbeat that gets sent to plugins #[derive(Debug)] @@ -229,6 +231,7 @@ impl Plugin for LogService { } } +// // The following pieces of code would be implemented by a "core" component, that is responsible for // setting up plugins and their communication. // @@ -238,11 +241,17 @@ impl Plugin for LogService { // /// Helper type for keeping information about plugins during runtime -#[derive(Debug)] struct PluginInfo { types: Vec<MessageType>, - receiver: Option<tedge_api::address::MessageReceiver>, - sender: tedge_api::address::MessageSender, + sender: Arc<RwLock<Option<Box<tedge_api::address::MessageFutureProducer>>>>, +} + +impl std::fmt::Debug for PluginInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PluginInfo") + .field("types", &self.types) + .finish_non_exhaustive() + } } /// The type that provides the communication infrastructure to the plugins. @@ -253,13 +262,12 @@ struct Communication { impl Communication { pub fn declare_plugin<PB: PluginBuilder<Self>>(&mut self, name: &str) { - let (sender, receiver) = tokio::sync::mpsc::channel(10); + let sender = Arc::new(RwLock::new(None)); self.plugins.insert( name.to_owned(), PluginInfo { types: PB::kind_message_types().into_types(), sender, - receiver: Some(receiver), }, ); } @@ -293,7 +301,9 @@ impl PluginDirectory for Communication { asked_types, name, plug.types, ); } else { - Ok(Address::new(plug.sender.clone())) + Ok(Address::new(tedge_api::address::InnerMessageSender::new( + plug.sender.clone(), + ))) } } @@ -356,59 +366,55 @@ async fn main() { let cancel_token = CancellationToken::new(); - let mut heartbeat = build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await; - let mut critical_service = build_critical_plugin(&mut comms, cancel_token.child_token()).await; - - heartbeat.plugin_mut().start().await.unwrap(); - critical_service.plugin_mut().start().await.unwrap(); + let mut heartbeat = Arc::new(RwLock::new( + build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await, + )); + let mut critical_service = Arc::new(RwLock::new( + build_critical_plugin(&mut comms, cancel_token.child_token()).await, + )); - let mut recv = comms - .plugins - .get_mut("heartbeat") - .unwrap() - .receiver - .take() + heartbeat.write().await.plugin_mut().start().await.unwrap(); + critical_service + .write() + .await + .plugin_mut() + .start() + .await .unwrap(); - let hb_cancel_token = cancel_token.child_token(); - let hb_handle = tokio::task::spawn(async move { - let hb = heartbeat; + let recv = comms.plugins.get("heartbeat").unwrap(); - loop { - tokio::select! { - Some(msg) = recv.recv() => { - hb.handle_message(msg).await.unwrap(); - } - _ = hb_cancel_token.cancelled() => break, + { + let mut lock = recv.sender.write().await; + let heartbeat = heartbeat.clone(); + + *lock = Some(Box::new(move |msg, _wait_kind| { + let heartbeat = heartbeat.clone(); + async move { + let heartbeat = heartbeat.read().await; + heartbeat.handle_message(msg).await; + Ok(()) } - } - - hb - }); - - let mut recv = comms - .plugins - .get_mut("critical-service") - .unwrap() - .receiver - .take() - .unwrap(); + .boxed() + })); + } - let cs_cancel_token = cancel_token.child_token(); - let cs_handle = tokio::task::spawn(async move { - let cs = critical_service; + let recv = comms.plugins.get("critical-service").unwrap(); - loop { - tokio::select! { - Some(msg) = recv.recv() => { - cs.handle_message(msg).await.unwrap(); - } - _ = cs_cancel_token.cancelled() => break, + { + let mut lock = recv.sender.write().await; + let critical_service = critical_service.clone(); + + *lock = Some(Box::new(move |msg, _wait_kind| { + let critical_service = critical_service.clone(); + async move { + let critical_service = critical_service.read().await; + critical_service.handle_message(msg).await; + Ok(()) } - } - - cs - }); + .boxed() + })); + } println!("Core: Stopping everything in 10 seconds!"); tokio::time::sleep(Duration::from_secs(12)).await; @@ -416,11 +422,16 @@ async fn main() { println!("Core: SHUTTING DOWN"); cancel_token.cancel(); - let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle); - - heartbeat.unwrap().plugin_mut().shutdown().await.unwrap(); + heartbeat + .write() + .await + .plugin_mut() + .shutdown() + .await + .unwrap(); critical_service - .unwrap() + .write() + .await .plugin_mut() .shutdown() .await diff --git a/crates/core/tedge_api/src/address.rs b/crates/core/tedge_api/src/address.rs index ef50edb4..007be13f 100644 --- a/crates/core/tedge_api/src/address.rs +++ b/crates/core/tedge_api/src/address.rs @@ -1,6 +1,7 @@ -use std::{marker::PhantomData, time::Duration}; +use std::{marker::PhantomData, sync::Arc, time::Duration}; -use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError}; +use futures::future::BoxFuture; +use tokio::sync::RwLock; use crate::{ message::MessageType, @@ -17,9 +18,77 @@ pub struct InternalMessage { pub(crate) reply_sender: tokio::sync::oneshot::Sender<AnyMessageBox>, } +#[doc(hidden)] +#[derive(Debug)] +pub enum ShouldWait { + Wait, + DontWait, + Timeout(std::time::Duration), +} + +#[doc(hidden)] +pub type MessageFutureProducer = dyn Fn(InternalMessage, ShouldWait) -> BoxFuture<'static, Result<(), InternalMessage>> + + Sync + + Send; + +#[doc(hidden)] +#[derive(Clone)] +pub struct InnerMessageSender { + send_provider: Arc<RwLock<Option<Box<MessageFutureProducer>>>>, +} + +impl std::fmt::Debug for InnerMessageSender { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("InnerMessageSender").finish_non_exhaustive() + } +} + +impl InnerMessageSender { + pub fn new(send_provider: Arc<RwLock<Option<Box<MessageFutureProducer>>>>) -> Self { + Self { send_provider } + } + + async fn send(&self, message: InternalMessage) -> Result<(), InternalMessage> { + let lock = self.send_provider.read().await; + if let Some(sender) = &*lock { + let sender = (*sender)(message, ShouldWait::Wait); + + sender.await + } else { + Err(message) + } + } + + async fn try_send(&self, message: InternalMessage) -> Result<(), InternalMessage> { + let lock = self.send_provider.read().await; + if let Some(sender) = &*lock { + let sender = (*sender)(message, ShouldWait::DontWait); + + sender.await + } else { + Err(message) + } + } + + async fn send_timeout( + &self, + message: InternalMessage, + timeout: Duration, + ) -> Result<(), InternalMessage> { + let lock = self.send_provider.read().await; + if let Some(sender) = &*lock { + let sender = (*sender)(message, ShouldWait::Timeout(timeout)); + + sender.await + } else { + Err(message) + } + } +} + /// THIS IS NOT PART OF THE PUBLIC API, AND MAY CHANGE AT ANY TIME #[doc(hidden)] -pub type MessageSender = tokio::sync::mpsc::Sender<InternalMessage>; +pub type MessageSender = InnerMessageSender; /// THIS IS NOT PART OF THE PUBLIC API, AND MAY CHANGE AT ANY TIME #[doc(hidden)] @@ -94,7 +163,7 @@ impl<RB: ReceiverBundle> Address<RB> { reply_sender: sender, }) .await - .map_err(|msg| *msg.0.data.downcast::<M>().unwrap())?; + .map_err(|msg| *msg.data.downcast::<M>().unwrap())?; Ok(ReplyReceiverFor { _pd: PhantomData, @@ -114,7 +183,7 @@ impl<RB: ReceiverBundle> Address<RB> { /// /// The error is returned if the receiving side (the plugin that is addressed) cannot currently /// receive messages (either because it is closed or the queue is full). - pub fn try_send<M: Message>(&self, msg: M) -> Result<ReplyReceiverFor<M>, M> { + pub async fn try_send<M: Message>(&self, msg: M) -> Result<ReplyReceiverFor<M>, M> { let (sender, receiver) = tokio::sync::oneshot::channel(); self.sender @@ -122,11 +191,8 @@ impl<RB: ReceiverBundle> Address<RB> { data: Box::new(msg), reply_sender: sender, }) - .map_err(|msg| match msg { - TrySendError::Full(data) | TrySendError::Closed(data) => { - *data.data.downcast::<M>().unwrap() - } - })?; + .await + .map_err(|msg| *msg.data.downcast::<M>().unwrap())?; Ok(ReplyReceiverFor { _pd: PhantomData, @@ -157,11 +223,7 @@ impl<RB: ReceiverBundle> Address<RB> { timeout, ) .await - .map_err(|msg| match msg { - SendTimeoutError::Timeout(data) | SendTimeoutError::Closed(data) => { - *data.data.downcast::<M>().unwrap() - } - })?; + .map_err(|msg| *msg.data.downcast::<M>().unwrap())?; Ok(ReplyReceiverFor { _pd: PhantomData, @@ -319,10 +381,13 @@ macro_rules! make_receiver_bundle { #[cfg(test)] mod tests { + use std::sync::Arc; + use static_assertions::{assert_impl_all, assert_not_impl_any}; + use tokio::sync::RwLock; use crate::{ - address::{ReplyReceiverFor, ReplySenderFor}, + address::{InnerMessageSender, ReplyReceiverFor, ReplySenderFor}, make_receiver_bundle, plugin::{AcceptsReplies, Message}, Address, @@ -370,7 +435,7 @@ mod tests { #[test] fn check_could_receive() { - let (sender, _receiver) = tokio::sync::mpsc::channel(1); + let sender = InnerMessageSender::new(Arc::new(RwLock::new(None))); let addr: Address<FooBar> = Address { _pd: std::marker::PhantomData, sender, |