diff options
author | Marcel Müller <m.mueller@ifm.com> | 2022-05-19 12:35:23 +0200 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@ifm.com> | 2022-05-20 13:29:57 +0200 |
commit | fabedb6c6ce388a82f0fb905f604e5e5bb8fe533 (patch) | |
tree | e749f6bd4f14b7c6c42de6f28622771ea2fc38d9 | |
parent | b97acb7dfcdd710b5c4913300fdfb33c953c4c52 (diff) |
Update core to use the new tedge_api interface
The tedge_api interface now no longer uses an MPSC channel but instead
requires the core to provide a 'MessageProvider' function. This function
is a `Fn` closure, meaning that it:
- Owns its contents, no borrows of any kinds (: 'static)
- Can be called many times
This closure has to return a BoxedFuture which when polled will spawn a
task that calls `handle_message` on the plugin it was built with.
Signed-off-by: Marcel Müller <m.mueller@ifm.com>
-rw-r--r-- | crates/core/tedge_core/src/communication.rs | 9 | ||||
-rw-r--r-- | crates/core/tedge_core/src/core_task.rs | 48 | ||||
-rw-r--r-- | crates/core/tedge_core/src/lib.rs | 1 | ||||
-rw-r--r-- | crates/core/tedge_core/src/plugin_task.rs | 85 | ||||
-rw-r--r-- | crates/core/tedge_core/src/reactor.rs | 8 |
5 files changed, 66 insertions, 85 deletions
diff --git a/crates/core/tedge_core/src/communication.rs b/crates/core/tedge_core/src/communication.rs index 8033b2a8..c7c6c441 100644 --- a/crates/core/tedge_core/src/communication.rs +++ b/crates/core/tedge_core/src/communication.rs @@ -6,7 +6,6 @@ use tedge_api::error::DirectoryError; use tedge_api::message::MessageType; use tedge_api::plugin::PluginDirectory as ApiPluginDirectory; use tedge_api::Address; -use tokio::sync::RwLock; use crate::errors::TedgeApplicationError; @@ -102,6 +101,9 @@ pub(crate) struct PluginInfo { /// The types of messages the plugin claims to handle pub(crate) types: Vec<MessageType>, + /// How many concurrent handlers are allowed to be spawned + pub(crate) channel_size: usize, + /// A sender to send messages to the plugin pub(crate) communicator: MessageSender, } @@ -110,8 +112,8 @@ impl std::fmt::Debug for PluginInfo { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("PluginInfo") .field("types", &self.types) - .field("sender", &"...") - .finish() + .field("channel_size", &self.channel_size) + .finish_non_exhaustive() } } @@ -119,6 +121,7 @@ impl PluginInfo { pub(crate) fn new(types: Vec<MessageType>, channel_size: usize) -> Self { Self { types, + channel_size, communicator: MessageSender::new(Default::default()), } } diff --git a/crates/core/tedge_core/src/core_task.rs b/crates/core/tedge_core/src/core_task.rs index 7f34af0b..059e73ae 100644 --- a/crates/core/tedge_core/src/core_task.rs +++ b/crates/core/tedge_core/src/core_task.rs @@ -1,14 +1,17 @@ +use std::sync::Arc; + use async_trait::async_trait; use tedge_api::{ - address::{MessageReceiver, MessageSender, ReplySenderFor}, + address::{MessageSender, ReplySenderFor}, message::StopCore, plugin::{Handle, PluginExt}, Plugin, PluginError, }; +use tokio::sync::{RwLock, Semaphore}; use tokio_util::sync::CancellationToken; -use tracing::{debug, trace, warn, Instrument}; +use tracing::{debug, trace, warn}; -use crate::errors::Result; +use crate::{errors::Result, message_handler::make_message_handler}; /// Helper type in the crate implementation for handling the actual message passing /// @@ -43,8 +46,19 @@ impl CoreTask { let running_core = RunningCore { sender: self.internal_sender, }; - let built_plugin = running_core.finish(); - let mut receiver_closed = false; + + // allocate a mpsc channel with one element size + // one element is enough because we stop the plugin anyways if there was a panic + let (panic_err_sender, _panic_err_recv) = tokio::sync::mpsc::channel(1); + + self.receiver + .init_with(make_message_handler( + String::from("core_task"), + Arc::new(Semaphore::new(10)), + Arc::new(RwLock::new(running_core.finish())), + panic_err_sender, + )) + .await; loop { tokio::select! { @@ -67,34 +81,16 @@ impl CoreTask { } } }, - - next_message = self.receiver.recv(), if !receiver_closed => { - trace!("Received message"); - match next_message { - Some(msg) => { - let handle_msg_res = built_plugin.handle_message(msg) - .instrument(tracing::trace_span!("core.core_task.handle_message")) - .await; - - match handle_msg_res { - Ok(_) => debug!("Core handled message successfully"), - Err(e) => warn!("Core failed to handle message: {:?}", e), - } - }, - - None => { - receiver_closed = true; - debug!("Receiver closed for Core"); - }, - } - } } } + self.receiver.reset().await; + Ok(()) } } +#[derive(Clone)] struct RunningCore { sender: tokio::sync::mpsc::Sender<CoreInternalMessage>, } diff --git a/crates/core/tedge_core/src/lib.rs b/crates/core/tedge_core/src/lib.rs index 174f94cf..c9a415e4 100644 --- a/crates/core/tedge_core/src/lib.rs +++ b/crates/core/tedge_core/src/lib.rs @@ -22,6 +22,7 @@ mod plugin_task; mod reactor; mod task; mod utils; +mod message_handler; pub use crate::communication::PluginDirectory; use crate::configuration::PluginInstanceConfiguration; diff --git a/crates/core/tedge_core/src/plugin_task.rs b/crates/core/tedge_core/src/plugin_task.rs index d303bb04..771a3431 100644 --- a/crates/core/tedge_core/src/plugin_task.rs +++ b/crates/core/tedge_core/src/plugin_task.rs @@ -1,27 +1,27 @@ use std::sync::Arc; use futures::FutureExt; -use tedge_api::address::MessageReceiver; use tedge_api::address::MessageSender; use tedge_api::plugin::BuiltPlugin; use tokio::sync::RwLock; +use tokio::sync::Semaphore; use tokio_util::sync::CancellationToken; use tracing::debug; use tracing::error; use tracing::info; use tracing::trace; -use tracing::warn; use tracing::Instrument; -use tracing::Span; use crate::errors::Result; use crate::errors::TedgeApplicationError; +use crate::message_handler::make_message_handler; use crate::task::Task; /// Type for handling the lifecycle of one individual Plugin instance pub struct PluginTask { plugin_name: String, plugin: BuiltPlugin, + channel_size: usize, plugin_msg_receiver: MessageSender, task_cancel_token: CancellationToken, shutdown_timeout: std::time::Duration, @@ -39,6 +39,7 @@ impl PluginTask { pub fn new( plugin_name: String, plugin: BuiltPlugin, + channel_size: usize, plugin_msg_receiver: MessageSender, task_cancel_token: CancellationToken, shutdown_timeout: std::time::Duration, @@ -46,6 +47,7 @@ impl PluginTask { Self { plugin_name, plugin, + channel_size, plugin_msg_receiver, task_cancel_token, shutdown_timeout, @@ -93,11 +95,12 @@ impl Task for PluginTask { plugin_mainloop( plugin.clone(), &self.plugin_name, + self.channel_size, plugin_msg_receiver, task_cancel_token, ) .in_current_span() - .await?; + .await? } trace!("Mainloop for plugin '{}' finished", self.plugin_name); @@ -117,14 +120,11 @@ impl Task for PluginTask { /// If the starting of the plugin failed, this will error as well, of course. #[tracing::instrument(skip(plugin, plugin_name))] async fn plugin_setup(plugin: Arc<RwLock<BuiltPlugin>>, plugin_name: &str) -> Result<()> { - let mut plug = plugin - .write() - .instrument(tracing::trace_span!("core.plugin_task.setup.lock")) - .await; + let mut plug_write = plugin.write().await; // we can use AssertUnwindSafe here because we're _not_ using the plugin after a panic has // happened. - match std::panic::AssertUnwindSafe(plug.plugin_mut().start()) + match std::panic::AssertUnwindSafe(plug_write.plugin_mut().start()) .catch_unwind() .instrument(tracing::trace_span!("core.plugin_task.setup.start", name = %plugin_name)) .await @@ -139,9 +139,11 @@ async fn plugin_setup(plugin: Arc<RwLock<BuiltPlugin>>, plugin_name: &str) -> Re )); } Ok(res) => { - res.map_err(|e| TedgeApplicationError::PluginSetupFailed(plugin_name.to_string(), e)) + res.map_err(|e| TedgeApplicationError::PluginSetupFailed(plugin_name.to_string(), e))?; } - } + }; + + Ok(()) } /// Run the "main loop" for the Plugin instance @@ -155,59 +157,28 @@ async fn plugin_setup(plugin: Arc<RwLock<BuiltPlugin>>, plugin_name: &str) -> Re async fn plugin_mainloop( plugin: Arc<RwLock<BuiltPlugin>>, plugin_name: &str, - mut plugin_msg_receiver: MessageReceiver, + plugin_channel_size: usize, + plugin_msg_receiver: MessageSender, task_cancel_token: CancellationToken, ) -> Result<()> { - let mut receiver_closed = false; - // allocate a mpsc channel with one element size // one element is enough because we stop the plugin anyways if there was a panic let (panic_err_sender, mut panic_err_recv) = tokio::sync::mpsc::channel(1); + plugin_msg_receiver + .init_with(make_message_handler( + plugin_name.to_string(), + Arc::new(Semaphore::new(plugin_channel_size)), + plugin, + panic_err_sender, + )) + .await; + loop { tokio::select! { - next_message = plugin_msg_receiver.recv(), if !receiver_closed => { - match next_message { - Some(msg) => { - let pname = plugin_name.to_string(); - let plug = plugin.clone(); - let panic_err_sender = panic_err_sender.clone(); - let parent_span = Span::current(); - - tokio::spawn(async move { - let handle_message_span = tracing::trace_span!(parent: parent_span, "core.plugin_task.mainloop.handle_message", msg = ?msg); - let handled_message = { - let read_plug = plug.read().await; - std::panic::AssertUnwindSafe(read_plug.handle_message(msg)) - .catch_unwind() - .instrument(handle_message_span) - .await - }; - - match handled_message { - Err(_) => { - // panic happened in handle_message() implementation - - error!("Plugin {} paniced in message handler", pname); - let _ = panic_err_sender - .send(TedgeApplicationError::PluginMessageHandlerPaniced(pname.to_string())); - }, - Ok(Ok(_)) => trace!("Plugin handled message successfully"), - Ok(Err(e)) => warn!("Plugin failed to handle message: {:?}", e), - } - }.in_current_span()); - }, - - None => { - receiver_closed = true; - debug!("Receiver closed for {} plugin", plugin_name); - }, - } - }, - panic_err = panic_err_recv.recv() => { - if let Some(panic_err) = panic_err { - return Err(panic_err) + if let Some(()) = panic_err { + break } } @@ -219,6 +190,10 @@ async fn plugin_mainloop( } } } + + // We reset the message handler, thus releasing any Arc<BuiltPlugin> that may still exist + plugin_msg_receiver.reset().await; + Ok(()) } diff --git a/crates/core/tedge_core/src/reactor.rs b/crates/core/tedge_core/src/reactor.rs index f67bf2c5..20e82fa8 100644 --- a/crates/core/tedge_core/src/reactor.rs +++ b/crates/core/tedge_core/src/reactor.rs @@ -41,6 +41,7 @@ impl std::fmt::Debug for Reactor { struct PluginTaskPrep { name: String, plugin: BuiltPlugin, + channel_size: usize, plugin_msg_comms: tedge_api::address::MessageSender, cancellation_token: CancellationToken, } @@ -157,6 +158,7 @@ impl Reactor { PluginTask::new( prep.name, prep.plugin, + prep.channel_size, prep.plugin_msg_comms, prep.cancellation_token, self.0.config().plugin_shutdown_timeout(), @@ -266,11 +268,15 @@ impl Reactor { .await .map_err(TedgeApplicationError::PluginInstantiationFailed) .map(|plugin| { - trace!("Instantiation of plugin '{}' successfull", plugin_name); + trace!(plugin.name = ?plugin_name, "Instantiation of plugin successfull"); + + // TODO: Get the correct one? + let channel_size = 10; PluginTaskPrep { name: plugin_name.to_string(), plugin, + channel_size, plugin_msg_comms, cancellation_token, } |