summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMarcel Müller <m.mueller@ifm.com>2022-05-19 12:35:23 +0200
committerMatthias Beyer <matthias.beyer@ifm.com>2022-05-20 13:29:57 +0200
commitfabedb6c6ce388a82f0fb905f604e5e5bb8fe533 (patch)
treee749f6bd4f14b7c6c42de6f28622771ea2fc38d9
parentb97acb7dfcdd710b5c4913300fdfb33c953c4c52 (diff)
Update core to use the new tedge_api interface
The tedge_api interface now no longer uses an MPSC channel but instead requires the core to provide a 'MessageProvider' function. This function is a `Fn` closure, meaning that it: - Owns its contents, no borrows of any kinds (: 'static) - Can be called many times This closure has to return a BoxedFuture which when polled will spawn a task that calls `handle_message` on the plugin it was built with. Signed-off-by: Marcel Müller <m.mueller@ifm.com>
-rw-r--r--crates/core/tedge_core/src/communication.rs9
-rw-r--r--crates/core/tedge_core/src/core_task.rs48
-rw-r--r--crates/core/tedge_core/src/lib.rs1
-rw-r--r--crates/core/tedge_core/src/plugin_task.rs85
-rw-r--r--crates/core/tedge_core/src/reactor.rs8
5 files changed, 66 insertions, 85 deletions
diff --git a/crates/core/tedge_core/src/communication.rs b/crates/core/tedge_core/src/communication.rs
index 8033b2a8..c7c6c441 100644
--- a/crates/core/tedge_core/src/communication.rs
+++ b/crates/core/tedge_core/src/communication.rs
@@ -6,7 +6,6 @@ use tedge_api::error::DirectoryError;
use tedge_api::message::MessageType;
use tedge_api::plugin::PluginDirectory as ApiPluginDirectory;
use tedge_api::Address;
-use tokio::sync::RwLock;
use crate::errors::TedgeApplicationError;
@@ -102,6 +101,9 @@ pub(crate) struct PluginInfo {
/// The types of messages the plugin claims to handle
pub(crate) types: Vec<MessageType>,
+ /// How many concurrent handlers are allowed to be spawned
+ pub(crate) channel_size: usize,
+
/// A sender to send messages to the plugin
pub(crate) communicator: MessageSender,
}
@@ -110,8 +112,8 @@ impl std::fmt::Debug for PluginInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PluginInfo")
.field("types", &self.types)
- .field("sender", &"...")
- .finish()
+ .field("channel_size", &self.channel_size)
+ .finish_non_exhaustive()
}
}
@@ -119,6 +121,7 @@ impl PluginInfo {
pub(crate) fn new(types: Vec<MessageType>, channel_size: usize) -> Self {
Self {
types,
+ channel_size,
communicator: MessageSender::new(Default::default()),
}
}
diff --git a/crates/core/tedge_core/src/core_task.rs b/crates/core/tedge_core/src/core_task.rs
index 7f34af0b..059e73ae 100644
--- a/crates/core/tedge_core/src/core_task.rs
+++ b/crates/core/tedge_core/src/core_task.rs
@@ -1,14 +1,17 @@
+use std::sync::Arc;
+
use async_trait::async_trait;
use tedge_api::{
- address::{MessageReceiver, MessageSender, ReplySenderFor},
+ address::{MessageSender, ReplySenderFor},
message::StopCore,
plugin::{Handle, PluginExt},
Plugin, PluginError,
};
+use tokio::sync::{RwLock, Semaphore};
use tokio_util::sync::CancellationToken;
-use tracing::{debug, trace, warn, Instrument};
+use tracing::{debug, trace, warn};
-use crate::errors::Result;
+use crate::{errors::Result, message_handler::make_message_handler};
/// Helper type in the crate implementation for handling the actual message passing
///
@@ -43,8 +46,19 @@ impl CoreTask {
let running_core = RunningCore {
sender: self.internal_sender,
};
- let built_plugin = running_core.finish();
- let mut receiver_closed = false;
+
+ // allocate a mpsc channel with one element size
+ // one element is enough because we stop the plugin anyways if there was a panic
+ let (panic_err_sender, _panic_err_recv) = tokio::sync::mpsc::channel(1);
+
+ self.receiver
+ .init_with(make_message_handler(
+ String::from("core_task"),
+ Arc::new(Semaphore::new(10)),
+ Arc::new(RwLock::new(running_core.finish())),
+ panic_err_sender,
+ ))
+ .await;
loop {
tokio::select! {
@@ -67,34 +81,16 @@ impl CoreTask {
}
}
},
-
- next_message = self.receiver.recv(), if !receiver_closed => {
- trace!("Received message");
- match next_message {
- Some(msg) => {
- let handle_msg_res = built_plugin.handle_message(msg)
- .instrument(tracing::trace_span!("core.core_task.handle_message"))
- .await;
-
- match handle_msg_res {
- Ok(_) => debug!("Core handled message successfully"),
- Err(e) => warn!("Core failed to handle message: {:?}", e),
- }
- },
-
- None => {
- receiver_closed = true;
- debug!("Receiver closed for Core");
- },
- }
- }
}
}
+ self.receiver.reset().await;
+
Ok(())
}
}
+#[derive(Clone)]
struct RunningCore {
sender: tokio::sync::mpsc::Sender<CoreInternalMessage>,
}
diff --git a/crates/core/tedge_core/src/lib.rs b/crates/core/tedge_core/src/lib.rs
index 174f94cf..c9a415e4 100644
--- a/crates/core/tedge_core/src/lib.rs
+++ b/crates/core/tedge_core/src/lib.rs
@@ -22,6 +22,7 @@ mod plugin_task;
mod reactor;
mod task;
mod utils;
+mod message_handler;
pub use crate::communication::PluginDirectory;
use crate::configuration::PluginInstanceConfiguration;
diff --git a/crates/core/tedge_core/src/plugin_task.rs b/crates/core/tedge_core/src/plugin_task.rs
index d303bb04..771a3431 100644
--- a/crates/core/tedge_core/src/plugin_task.rs
+++ b/crates/core/tedge_core/src/plugin_task.rs
@@ -1,27 +1,27 @@
use std::sync::Arc;
use futures::FutureExt;
-use tedge_api::address::MessageReceiver;
use tedge_api::address::MessageSender;
use tedge_api::plugin::BuiltPlugin;
use tokio::sync::RwLock;
+use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::trace;
-use tracing::warn;
use tracing::Instrument;
-use tracing::Span;
use crate::errors::Result;
use crate::errors::TedgeApplicationError;
+use crate::message_handler::make_message_handler;
use crate::task::Task;
/// Type for handling the lifecycle of one individual Plugin instance
pub struct PluginTask {
plugin_name: String,
plugin: BuiltPlugin,
+ channel_size: usize,
plugin_msg_receiver: MessageSender,
task_cancel_token: CancellationToken,
shutdown_timeout: std::time::Duration,
@@ -39,6 +39,7 @@ impl PluginTask {
pub fn new(
plugin_name: String,
plugin: BuiltPlugin,
+ channel_size: usize,
plugin_msg_receiver: MessageSender,
task_cancel_token: CancellationToken,
shutdown_timeout: std::time::Duration,
@@ -46,6 +47,7 @@ impl PluginTask {
Self {
plugin_name,
plugin,
+ channel_size,
plugin_msg_receiver,
task_cancel_token,
shutdown_timeout,
@@ -93,11 +95,12 @@ impl Task for PluginTask {
plugin_mainloop(
plugin.clone(),
&self.plugin_name,
+ self.channel_size,
plugin_msg_receiver,
task_cancel_token,
)
.in_current_span()
- .await?;
+ .await?
}
trace!("Mainloop for plugin '{}' finished", self.plugin_name);
@@ -117,14 +120,11 @@ impl Task for PluginTask {
/// If the starting of the plugin failed, this will error as well, of course.
#[tracing::instrument(skip(plugin, plugin_name))]
async fn plugin_setup(plugin: Arc<RwLock<BuiltPlugin>>, plugin_name: &str) -> Result<()> {
- let mut plug = plugin
- .write()
- .instrument(tracing::trace_span!("core.plugin_task.setup.lock"))
- .await;
+ let mut plug_write = plugin.write().await;
// we can use AssertUnwindSafe here because we're _not_ using the plugin after a panic has
// happened.
- match std::panic::AssertUnwindSafe(plug.plugin_mut().start())
+ match std::panic::AssertUnwindSafe(plug_write.plugin_mut().start())
.catch_unwind()
.instrument(tracing::trace_span!("core.plugin_task.setup.start", name = %plugin_name))
.await
@@ -139,9 +139,11 @@ async fn plugin_setup(plugin: Arc<RwLock<BuiltPlugin>>, plugin_name: &str) -> Re
));
}
Ok(res) => {
- res.map_err(|e| TedgeApplicationError::PluginSetupFailed(plugin_name.to_string(), e))
+ res.map_err(|e| TedgeApplicationError::PluginSetupFailed(plugin_name.to_string(), e))?;
}
- }
+ };
+
+ Ok(())
}
/// Run the "main loop" for the Plugin instance
@@ -155,59 +157,28 @@ async fn plugin_setup(plugin: Arc<RwLock<BuiltPlugin>>, plugin_name: &str) -> Re
async fn plugin_mainloop(
plugin: Arc<RwLock<BuiltPlugin>>,
plugin_name: &str,
- mut plugin_msg_receiver: MessageReceiver,
+ plugin_channel_size: usize,
+ plugin_msg_receiver: MessageSender,
task_cancel_token: CancellationToken,
) -> Result<()> {
- let mut receiver_closed = false;
-
// allocate a mpsc channel with one element size
// one element is enough because we stop the plugin anyways if there was a panic
let (panic_err_sender, mut panic_err_recv) = tokio::sync::mpsc::channel(1);
+ plugin_msg_receiver
+ .init_with(make_message_handler(
+ plugin_name.to_string(),
+ Arc::new(Semaphore::new(plugin_channel_size)),
+ plugin,
+ panic_err_sender,
+ ))
+ .await;
+
loop {
tokio::select! {
- next_message = plugin_msg_receiver.recv(), if !receiver_closed => {
- match next_message {
- Some(msg) => {
- let pname = plugin_name.to_string();
- let plug = plugin.clone();
- let panic_err_sender = panic_err_sender.clone();
- let parent_span = Span::current();
-
- tokio::spawn(async move {
- let handle_message_span = tracing::trace_span!(parent: parent_span, "core.plugin_task.mainloop.handle_message", msg = ?msg);
- let handled_message = {
- let read_plug = plug.read().await;
- std::panic::AssertUnwindSafe(read_plug.handle_message(msg))
- .catch_unwind()
- .instrument(handle_message_span)
- .await
- };
-
- match handled_message {
- Err(_) => {
- // panic happened in handle_message() implementation
-
- error!("Plugin {} paniced in message handler", pname);
- let _ = panic_err_sender
- .send(TedgeApplicationError::PluginMessageHandlerPaniced(pname.to_string()));
- },
- Ok(Ok(_)) => trace!("Plugin handled message successfully"),
- Ok(Err(e)) => warn!("Plugin failed to handle message: {:?}", e),
- }
- }.in_current_span());
- },
-
- None => {
- receiver_closed = true;
- debug!("Receiver closed for {} plugin", plugin_name);
- },
- }
- },
-
panic_err = panic_err_recv.recv() => {
- if let Some(panic_err) = panic_err {
- return Err(panic_err)
+ if let Some(()) = panic_err {
+ break
}
}
@@ -219,6 +190,10 @@ async fn plugin_mainloop(
}
}
}
+
+ // We reset the message handler, thus releasing any Arc<BuiltPlugin> that may still exist
+ plugin_msg_receiver.reset().await;
+
Ok(())
}
diff --git a/crates/core/tedge_core/src/reactor.rs b/crates/core/tedge_core/src/reactor.rs
index f67bf2c5..20e82fa8 100644
--- a/crates/core/tedge_core/src/reactor.rs
+++ b/crates/core/tedge_core/src/reactor.rs
@@ -41,6 +41,7 @@ impl std::fmt::Debug for Reactor {
struct PluginTaskPrep {
name: String,
plugin: BuiltPlugin,
+ channel_size: usize,
plugin_msg_comms: tedge_api::address::MessageSender,
cancellation_token: CancellationToken,
}
@@ -157,6 +158,7 @@ impl Reactor {
PluginTask::new(
prep.name,
prep.plugin,
+ prep.channel_size,
prep.plugin_msg_comms,
prep.cancellation_token,
self.0.config().plugin_shutdown_timeout(),
@@ -266,11 +268,15 @@ impl Reactor {
.await
.map_err(TedgeApplicationError::PluginInstantiationFailed)
.map(|plugin| {
- trace!("Instantiation of plugin '{}' successfull", plugin_name);
+ trace!(plugin.name = ?plugin_name, "Instantiation of plugin successfull");
+
+ // TODO: Get the correct one?
+ let channel_size = 10;
PluginTaskPrep {
name: plugin_name.to_string(),
plugin,
+ channel_size,
plugin_msg_comms,
cancellation_token,
}