From 43ed68145e7044c0d8f170af794d71c2fa93599a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcel=20M=C3=BCller?= Date: Mon, 16 May 2022 10:25:36 +0200 Subject: Replace tokio MPSC with a direct Future MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit MPSC are a heavy form of indirection that is potentially not needed. This patch removes them and instead opts to use a Fn Generator of futures that correspond to the model of communication One Question -> One Reponse. Signed-off-by: Marcel Müller Signed-off-by: Matthias Beyer Reviewed-by: Matthias Beyer --- crates/core/tedge_api/examples/heartbeat.rs | 121 ++++++++++++----------- crates/core/tedge_api/examples/universal_log.rs | 123 +++++++++++++----------- 2 files changed, 132 insertions(+), 112 deletions(-) (limited to 'crates/core/tedge_api/examples') diff --git a/crates/core/tedge_api/examples/heartbeat.rs b/crates/core/tedge_api/examples/heartbeat.rs index 37e9c8ec..654c4311 100644 --- a/crates/core/tedge_api/examples/heartbeat.rs +++ b/crates/core/tedge_api/examples/heartbeat.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use async_trait::async_trait; use futures::FutureExt; @@ -9,6 +9,7 @@ use tedge_api::{ Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory, PluginError, }; +use tokio::sync::RwLock; /// A message that represents a heartbeat that gets sent to plugins #[derive(Debug)] @@ -294,11 +295,17 @@ impl Plugin for CriticalService { // /// Helper type for keeping information about plugins during runtime -#[derive(Debug)] struct PluginInfo { types: Vec, - receiver: Option, - sender: tedge_api::address::MessageSender, + sender: Arc>>>, +} + +impl std::fmt::Debug for PluginInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PluginInfo") + .field("types", &self.types) + .finish_non_exhaustive() + } } /// The type that provides the communication infrastructure to the plugins. @@ -309,13 +316,12 @@ struct Communication { impl Communication { pub fn declare_plugin>(&mut self, name: &str) { - let (sender, receiver) = tokio::sync::mpsc::channel(10); + let sender = Arc::new(RwLock::new(None)); self.plugins.insert( name.to_owned(), PluginInfo { types: PB::kind_message_types().into_types(), sender, - receiver: Some(receiver), }, ); } @@ -349,7 +355,9 @@ impl PluginDirectory for Communication { asked_types, name, plug.types, ); } else { - Ok(Address::new(plug.sender.clone())) + Ok(Address::new(tedge_api::address::InnerMessageSender::new( + plug.sender.clone(), + ))) } } @@ -412,59 +420,55 @@ async fn main() { let cancel_token = CancellationToken::new(); - let mut heartbeat = build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await; - let mut critical_service = build_critical_plugin(&mut comms, cancel_token.child_token()).await; + let mut heartbeat = Arc::new(RwLock::new( + build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await, + )); + let mut critical_service = Arc::new(RwLock::new( + build_critical_plugin(&mut comms, cancel_token.child_token()).await, + )); - heartbeat.plugin_mut().start().await.unwrap(); - critical_service.plugin_mut().start().await.unwrap(); - - let mut recv = comms - .plugins - .get_mut("heartbeat") - .unwrap() - .receiver - .take() + heartbeat.write().await.plugin_mut().start().await.unwrap(); + critical_service + .write() + .await + .plugin_mut() + .start() + .await .unwrap(); - let hb_cancel_token = cancel_token.child_token(); - let hb_handle = tokio::task::spawn(async move { - let hb = heartbeat; + let recv = comms.plugins.get("heartbeat").unwrap(); - loop { - tokio::select! { - Some(msg) = recv.recv() => { - hb.handle_message(msg).await.unwrap(); - } - _ = hb_cancel_token.cancelled() => break, + { + let mut lock = recv.sender.write().await; + let heartbeat = heartbeat.clone(); + + *lock = Some(Box::new(move |msg, _wait_kind| { + let heartbeat = heartbeat.clone(); + async move { + let heartbeat = heartbeat.read().await; + heartbeat.handle_message(msg).await; + Ok(()) } - } - - hb - }); - - let mut recv = comms - .plugins - .get_mut("critical-service") - .unwrap() - .receiver - .take() - .unwrap(); + .boxed() + })); + } - let cs_cancel_token = cancel_token.child_token(); - let cs_handle = tokio::task::spawn(async move { - let cs = critical_service; + let recv = comms.plugins.get("critical-service").unwrap(); - loop { - tokio::select! { - Some(msg) = recv.recv() => { - cs.handle_message(msg).await.unwrap(); - } - _ = cs_cancel_token.cancelled() => break, + { + let mut lock = recv.sender.write().await; + let critical_service = critical_service.clone(); + + *lock = Some(Box::new(move |msg, _wait_kind| { + let critical_service = critical_service.clone(); + async move { + let critical_service = critical_service.read().await; + critical_service.handle_message(msg).await; + Ok(()) } - } - - cs - }); + .boxed() + })); + } println!("Core: Stopping everything in 10 seconds!"); tokio::time::sleep(Duration::from_secs(12)).await; @@ -472,11 +476,16 @@ async fn main() { println!("Core: SHUTTING DOWN"); cancel_token.cancel(); - let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle); - - heartbeat.unwrap().plugin_mut().shutdown().await.unwrap(); + heartbeat + .write() + .await + .plugin_mut() + .shutdown() + .await + .unwrap(); critical_service - .unwrap() + .write() + .await .plugin_mut() .shutdown() .await diff --git a/crates/core/tedge_api/examples/universal_log.rs b/crates/core/tedge_api/examples/universal_log.rs index f2b3dcea..82ff3130 100644 --- a/crates/core/tedge_api/examples/universal_log.rs +++ b/crates/core/tedge_api/examples/universal_log.rs @@ -1,6 +1,7 @@ -use std::{collections::HashMap, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use async_trait::async_trait; +use futures::FutureExt; use tedge_api::{ address::ReplySenderFor, message::{AnyMessage, MessageType}, @@ -8,6 +9,7 @@ use tedge_api::{ Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory, PluginError, }; +use tokio::sync::RwLock; /// A message that represents a heartbeat that gets sent to plugins #[derive(Debug)] @@ -229,6 +231,7 @@ impl Plugin for LogService { } } +// // The following pieces of code would be implemented by a "core" component, that is responsible for // setting up plugins and their communication. // @@ -238,11 +241,17 @@ impl Plugin for LogService { // /// Helper type for keeping information about plugins during runtime -#[derive(Debug)] struct PluginInfo { types: Vec, - receiver: Option, - sender: tedge_api::address::MessageSender, + sender: Arc>>>, +} + +impl std::fmt::Debug for PluginInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PluginInfo") + .field("types", &self.types) + .finish_non_exhaustive() + } } /// The type that provides the communication infrastructure to the plugins. @@ -253,13 +262,12 @@ struct Communication { impl Communication { pub fn declare_plugin>(&mut self, name: &str) { - let (sender, receiver) = tokio::sync::mpsc::channel(10); + let sender = Arc::new(RwLock::new(None)); self.plugins.insert( name.to_owned(), PluginInfo { types: PB::kind_message_types().into_types(), sender, - receiver: Some(receiver), }, ); } @@ -293,7 +301,9 @@ impl PluginDirectory for Communication { asked_types, name, plug.types, ); } else { - Ok(Address::new(plug.sender.clone())) + Ok(Address::new(tedge_api::address::InnerMessageSender::new( + plug.sender.clone(), + ))) } } @@ -356,59 +366,55 @@ async fn main() { let cancel_token = CancellationToken::new(); - let mut heartbeat = build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await; - let mut critical_service = build_critical_plugin(&mut comms, cancel_token.child_token()).await; - - heartbeat.plugin_mut().start().await.unwrap(); - critical_service.plugin_mut().start().await.unwrap(); + let mut heartbeat = Arc::new(RwLock::new( + build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await, + )); + let mut critical_service = Arc::new(RwLock::new( + build_critical_plugin(&mut comms, cancel_token.child_token()).await, + )); - let mut recv = comms - .plugins - .get_mut("heartbeat") - .unwrap() - .receiver - .take() + heartbeat.write().await.plugin_mut().start().await.unwrap(); + critical_service + .write() + .await + .plugin_mut() + .start() + .await .unwrap(); - let hb_cancel_token = cancel_token.child_token(); - let hb_handle = tokio::task::spawn(async move { - let hb = heartbeat; + let recv = comms.plugins.get("heartbeat").unwrap(); - loop { - tokio::select! { - Some(msg) = recv.recv() => { - hb.handle_message(msg).await.unwrap(); - } - _ = hb_cancel_token.cancelled() => break, + { + let mut lock = recv.sender.write().await; + let heartbeat = heartbeat.clone(); + + *lock = Some(Box::new(move |msg, _wait_kind| { + let heartbeat = heartbeat.clone(); + async move { + let heartbeat = heartbeat.read().await; + heartbeat.handle_message(msg).await; + Ok(()) } - } - - hb - }); - - let mut recv = comms - .plugins - .get_mut("critical-service") - .unwrap() - .receiver - .take() - .unwrap(); + .boxed() + })); + } - let cs_cancel_token = cancel_token.child_token(); - let cs_handle = tokio::task::spawn(async move { - let cs = critical_service; + let recv = comms.plugins.get("critical-service").unwrap(); - loop { - tokio::select! { - Some(msg) = recv.recv() => { - cs.handle_message(msg).await.unwrap(); - } - _ = cs_cancel_token.cancelled() => break, + { + let mut lock = recv.sender.write().await; + let critical_service = critical_service.clone(); + + *lock = Some(Box::new(move |msg, _wait_kind| { + let critical_service = critical_service.clone(); + async move { + let critical_service = critical_service.read().await; + critical_service.handle_message(msg).await; + Ok(()) } - } - - cs - }); + .boxed() + })); + } println!("Core: Stopping everything in 10 seconds!"); tokio::time::sleep(Duration::from_secs(12)).await; @@ -416,11 +422,16 @@ async fn main() { println!("Core: SHUTTING DOWN"); cancel_token.cancel(); - let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle); - - heartbeat.unwrap().plugin_mut().shutdown().await.unwrap(); + heartbeat + .write() + .await + .plugin_mut() + .shutdown() + .await + .unwrap(); critical_service - .unwrap() + .write() + .await .plugin_mut() .shutdown() .await -- cgit v1.2.3