summaryrefslogtreecommitdiffstats
path: root/crates
diff options
context:
space:
mode:
authorMarcel Müller <m.mueller@ifm.com>2022-03-19 12:18:12 +0100
committerMarcel Müller <m.mueller@ifm.com>2022-03-21 08:53:49 +0100
commit196f7600234bc131956fad295959928153ded638 (patch)
treea22ca17b606cd17c7b628ecc5f4acce1a43464b1 /crates
parent2dc415263c691d021f8e60f5f72a0800e9b975de (diff)
Add reply functionality to messages
Signed-off-by: Marcel Müller <m.mueller@ifm.com>
Diffstat (limited to 'crates')
-rw-r--r--crates/core/tedge_api/examples/heartbeat.rs122
-rw-r--r--crates/core/tedge_api/src/address.rs86
-rw-r--r--crates/core/tedge_api/src/lib.rs4
-rw-r--r--crates/core/tedge_api/src/message.rs12
-rw-r--r--crates/core/tedge_api/src/plugin.rs65
5 files changed, 231 insertions, 58 deletions
diff --git a/crates/core/tedge_api/examples/heartbeat.rs b/crates/core/tedge_api/examples/heartbeat.rs
index 12554c17..d2fc2f21 100644
--- a/crates/core/tedge_api/examples/heartbeat.rs
+++ b/crates/core/tedge_api/examples/heartbeat.rs
@@ -6,6 +6,8 @@ use std::{
use async_trait::async_trait;
use tedge_api::{
+ address::ReplySender,
+ message::NoReply,
plugin::{BuiltPlugin, Handle, HandleTypes, Message, PluginExt},
Address, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory, PluginError,
};
@@ -13,15 +15,19 @@ use tedge_api::{
/// A message that represents a heartbeat that gets sent to plugins
#[derive(Debug)]
struct Heartbeat;
-impl Message for Heartbeat {}
+impl Message for Heartbeat {
+ type Reply = HeartbeatStatus;
+}
/// The reply for a heartbeat
#[derive(Debug)]
-enum HeartbeatStatusReply {
+enum HeartbeatStatus {
Alive,
Degraded,
}
-impl Message for HeartbeatStatusReply {}
+impl Message for HeartbeatStatus {
+ type Reply = NoReply;
+}
/// A PluginBuilder that gets used to build a HeartbeatService plugin instance
#[derive(Debug)]
@@ -37,7 +43,7 @@ impl<PD: PluginDirectory> PluginBuilder<PD> for HeartbeatServiceBuilder {
where
Self: Sized,
{
- HandleTypes::get_handlers_for::<(HeartbeatStatusReply,), HeartbeatService>()
+ HandleTypes::empty()
}
async fn verify_configuration(
@@ -59,13 +65,17 @@ impl<PD: PluginDirectory> PluginBuilder<PD> for HeartbeatServiceBuilder {
let monitored_services = hb_config
.plugins
.iter()
- .map(|name| tedge_comms.get_address_for::<HeartbeatMessages>(name))
+ .map(|name| {
+ tedge_comms
+ .get_address_for::<HeartbeatMessages>(name)
+ .map(|addr| (name.clone(), addr))
+ })
.collect::<Result<Vec<_>, _>>()?;
Ok(HeartbeatService::new(
Duration::from_millis(hb_config.interval),
monitored_services,
)
- .into_untyped::<(HeartbeatStatusReply,)>())
+ .into_untyped::<()>())
}
}
@@ -79,7 +89,7 @@ struct HeartbeatConfig {
/// The HeartbeatService type represents the actual plugin
struct HeartbeatService {
interval_duration: Duration,
- monitored_services: Vec<Address<HeartbeatMessages>>,
+ monitored_services: Vec<(String, Address<HeartbeatMessages>)>,
}
#[async_trait]
@@ -92,28 +102,54 @@ impl Plugin for HeartbeatService {
/// the heartbeat. In a real world scenario, that background task would be started here.
async fn setup(&mut self) -> Result<(), PluginError> {
println!(
- "Setting up heartbeat service with interval: {:?}!",
+ "HeartbeatService: Setting up heartbeat service with interval: {:?}!",
self.interval_duration
);
- let mut interval = tokio::time::interval(self.interval_duration);
- let services = self.monitored_services.clone();
-
- tokio::spawn(async move {
- loop {
- interval.tick().await;
- for service in &services {
- println!("Sending heartbeat to service: {:?}", service);
- service.send(Heartbeat).await.unwrap();
+ for service in &self.monitored_services {
+ let mut interval = tokio::time::interval(self.interval_duration);
+ let service = service.clone();
+ tokio::spawn(async move {
+ loop {
+ interval.tick().await;
+ println!(
+ "HeartbeatService: Sending heartbeat to service: {:?}",
+ service
+ );
+ match service
+ .1
+ .send(Heartbeat)
+ .await
+ .unwrap()
+ .wait_for_reply(Duration::from_millis(100))
+ .await
+ {
+ Ok(HeartbeatStatus::Alive) => {
+ println!("HeartbeatService: Received all is well!")
+ }
+ Ok(HeartbeatStatus::Degraded) => {
+ println!(
+ "HeartbeatService: Oh-oh! Plugin '{}' is not doing well",
+ service.0
+ )
+ }
+
+ Err(reply_error) => {
+ println!(
+ "HeartbeatService: Critical error for '{}'! {reply_error}",
+ service.0
+ )
+ }
+ }
}
- }
- });
+ });
+ }
Ok(())
}
/// A plugin author can use this shutdown function to clean resources when thin-edge shuts down
async fn shutdown(&mut self) -> Result<(), PluginError> {
- println!("Shutting down heartbeat service!");
+ println!("HeartbeatService: Shutting down heartbeat service!");
Ok(())
}
}
@@ -121,7 +157,7 @@ impl Plugin for HeartbeatService {
impl HeartbeatService {
fn new(
interval_duration: Duration,
- monitored_services: Vec<Address<HeartbeatMessages>>,
+ monitored_services: Vec<(String, Address<HeartbeatMessages>)>,
) -> Self {
Self {
interval_duration,
@@ -130,16 +166,6 @@ impl HeartbeatService {
}
}
-/// The Handle<HeartbeatStatusReply> implementation is called when the HeartbeatService receives a
-/// HeartbeatStatusReply
-#[async_trait]
-impl Handle<HeartbeatStatusReply> for HeartbeatService {
- async fn handle_message(&self, _message: HeartbeatStatusReply) -> Result<(), PluginError> {
- println!("Received HeartbeatReply!");
- Ok(())
- }
-}
-
/// A plugin that receives heartbeats
struct CriticalServiceBuilder;
@@ -175,19 +201,39 @@ impl<PD: PluginDirectory> PluginBuilder<PD> for CriticalServiceBuilder {
where
PD: 'async_trait,
{
- Ok(CriticalService {}.into_untyped::<(Heartbeat,)>())
+ Ok(CriticalService {
+ status: tokio::sync::Mutex::new(true),
+ }
+ .into_untyped::<(Heartbeat,)>())
}
}
/// The actual "critical" plugin implementation
-struct CriticalService;
+struct CriticalService {
+ status: tokio::sync::Mutex<bool>,
+}
/// The CriticalService can receive Heartbeat objects, thus it needs a Handle<Heartbeat>
/// implementation
#[async_trait]
impl Handle<Heartbeat> for CriticalService {
- async fn handle_message(&self, _message: Heartbeat) -> Result<(), PluginError> {
- println!("Received Heartbeat!");
+ async fn handle_message(
+ &self,
+ _message: Heartbeat,
+ sender: ReplySender<HeartbeatStatus>,
+ ) -> Result<(), PluginError> {
+ println!("CriticalService: Received Heartbeat!");
+ let mut status = self.status.lock().await;
+
+ let _ = sender.reply(if *status {
+ println!("CriticalService: Sending back alive!");
+ HeartbeatStatus::Alive
+ } else {
+ println!("CriticalService: Sending back degraded!");
+ HeartbeatStatus::Degraded
+ });
+
+ *status = !*status;
Ok(())
}
}
@@ -196,12 +242,12 @@ impl Handle<Heartbeat> for CriticalService {
#[async_trait]
impl Plugin for CriticalService {
async fn setup(&mut self) -> Result<(), PluginError> {
- println!("Setting up critical service!");
+ println!("CriticalService: Setting up critical service!");
Ok(())
}
async fn shutdown(&mut self) -> Result<(), PluginError> {
- println!("Shutting down critical service service!");
+ println!("CriticalService: Shutting down critical service service!");
Ok(())
}
}
@@ -302,7 +348,7 @@ async fn build_heartbeat_plugin(comms: &mut Communication) -> BuiltPlugin {
let config = toml::from_str(
r#"
- interval = 500
+ interval = 5000
plugins = ["critical-service"]
"#,
)
diff --git a/crates/core/tedge_api/src/address.rs b/crates/core/tedge_api/src/address.rs
index 4155e798..fd23dd0a 100644
--- a/crates/core/tedge_api/src/address.rs
+++ b/crates/core/tedge_api/src/address.rs
@@ -1,14 +1,24 @@
-use std::marker::PhantomData;
+use std::{marker::PhantomData, time::Duration};
use crate::plugin::{Contains, Message, MessageBundle};
+#[doc(hidden)]
+pub type AnySendBox = Box<dyn std::any::Any + Send>;
+
+#[doc(hidden)]
+#[derive(Debug)]
+pub struct InternalMessage {
+ pub(crate) data: AnySendBox,
+ pub(crate) reply_sender: tokio::sync::oneshot::Sender<AnySendBox>,
+}
+
/// THIS IS NOT PART OF THE PUBLIC API, AND MAY CHANGE AT ANY TIME
#[doc(hidden)]
-pub type MessageSender = tokio::sync::mpsc::Sender<Box<dyn std::any::Any + Send>>;
+pub type MessageSender = tokio::sync::mpsc::Sender<InternalMessage>;
/// THIS IS NOT PART OF THE PUBLIC API, AND MAY CHANGE AT ANY TIME
#[doc(hidden)]
-pub type MessageReceiver = tokio::sync::mpsc::Receiver<Box<dyn std::any::Any + Send>>;
+pub type MessageReceiver = tokio::sync::mpsc::Receiver<InternalMessage>;
/// An address of a plugin that can receive messages a certain type of messages
///
@@ -63,28 +73,88 @@ impl<MB: MessageBundle> Address<MB> {
/// # Details
///
/// For details on sending and receiving, see `tokio::sync::mpsc::Sender`.
- pub async fn send<M: Message>(&self, msg: M) -> Result<(), M>
+ pub async fn send<M: Message>(&self, msg: M) -> Result<ReplyReceiver<M::Reply>, M>
where
MB: Contains<M>,
{
+ let (sender, receiver) = tokio::sync::oneshot::channel();
+
self.sender
- .send(Box::new(msg))
+ .send(InternalMessage {
+ data: Box::new(msg),
+ reply_sender: sender,
+ })
+ .await
+ .map_err(|msg| *msg.0.data.downcast::<M>().unwrap())?;
+
+ Ok(ReplyReceiver {
+ _pd: PhantomData,
+ reply_recv: receiver,
+ })
+ }
+}
+
+#[derive(Debug)]
+pub struct ReplyReceiver<M> {
+ _pd: PhantomData<M>,
+ reply_recv: tokio::sync::oneshot::Receiver<AnySendBox>,
+}
+
+impl<M: Message> ReplyReceiver<M> {
+ pub async fn wait_for_reply(self, timeout: Duration) -> Result<M, ReplyError> {
+ let data = tokio::time::timeout(timeout, self.reply_recv)
.await
- .map_err(|msg| *msg.0.downcast::<M>().unwrap())
+ .map_err(|_| ReplyError::Timeout)?
+ .map_err(|_| ReplyError::Unknown)?;
+
+ Ok(*data.downcast().expect("Invalid type received"))
}
}
+#[derive(Debug)]
+pub struct ReplySender<M> {
+ _pd: PhantomData<M>,
+ reply_sender: tokio::sync::oneshot::Sender<AnySendBox>,
+}
+
+impl<M: Message> ReplySender<M> {
+ pub(crate) fn new(reply_sender: tokio::sync::oneshot::Sender<AnySendBox>) -> Self {
+ Self {
+ _pd: PhantomData,
+ reply_sender,
+ }
+ }
+
+ pub fn reply(self, msg: M) -> Result<(), M> {
+ self.reply_sender
+ .send(Box::new(msg))
+ .map_err(|msg| *msg.downcast::<M>().unwrap())
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum ReplyError {
+ #[error("There was no response before timeout")]
+ Timeout,
+ #[error("Could not send reply")]
+ Unknown
+}
+
#[cfg(test)]
mod tests {
use crate::{make_message_bundle, plugin::Message, Address};
struct Foo;
- impl Message for Foo {}
+ impl Message for Foo {
+ type Reply = Bar;
+ }
struct Bar;
- impl Message for Bar {}
+ impl Message for Bar {
+ type Reply = Bar;
+ }
make_message_bundle!(struct FooBar(Foo, Bar));
diff --git a/crates/core/tedge_api/src/lib.rs b/crates/core/tedge_api/src/lib.rs
index cf84d06c..363967e0 100644
--- a/crates/core/tedge_api/src/lib.rs
+++ b/crates/core/tedge_api/src/lib.rs
@@ -1,10 +1,10 @@
-#![deny(
+#![cfg_attr(test, deny(
missing_docs,
missing_debug_implementations,
unreachable_pub,
unsafe_code,
variant_size_differences
-)]
+))]
#![doc = include_str!("../README.md")]
/// All the parts required to write a plugin
diff --git a/crates/core/tedge_api/src/message.rs b/crates/core/tedge_api/src/message.rs
index cf33fe43..5ff9f633 100644
--- a/crates/core/tedge_api/src/message.rs
+++ b/crates/core/tedge_api/src/message.rs
@@ -1,9 +1,19 @@
use crate::plugin::Message;
+#[derive(Debug)]
+/// A message which cannot be constructed and thus cannot be used to reply with
+pub enum NoReply {}
+
+impl Message for NoReply {
+ type Reply = NoReply;
+}
+
/// A message to tell the core to stop thin-edge
#[derive(Debug)]
pub struct StopCore;
-impl Message for StopCore {}
+impl Message for StopCore {
+ type Reply = NoReply;
+}
crate::make_message_bundle!(pub struct CoreMessages(StopCore));
diff --git a/crates/core/tedge_api/src/plugin.rs b/crates/core/tedge_api/src/plugin.rs
index 0d23950e..f7bf70a1 100644
--- a/crates/core/tedge_api/src/plugin.rs
+++ b/crates/core/tedge_api/src/plugin.rs
@@ -14,7 +14,12 @@ use downcast_rs::{impl_downcast, DowncastSync};
use async_trait::async_trait;
-use crate::{error::PluginError, message::CoreMessages, Address};
+use crate::{
+ address::{InternalMessage, ReplySender},
+ error::PluginError,
+ message::CoreMessages,
+ Address,
+};
/// The communication struct to interface with the core of ThinEdge
///
@@ -266,9 +271,13 @@ impl_downcast!(sync Plugin);
/// A Plugin that is able to receive different types of messages would have multiple
/// implementations of this trait.
#[async_trait]
-pub trait Handle<Msg> {
+pub trait Handle<Msg: Message> {
/// Handle a message of type `Msg` that gets send to this plugin
- async fn handle_message(&self, message: Msg) -> Result<(), PluginError>;
+ async fn handle_message(
+ &self,
+ message: Msg,
+ sender: ReplySender<Msg::Reply>,
+ ) -> Result<(), PluginError>;
}
#[derive(Debug)]
@@ -349,6 +358,12 @@ impl HandleTypes {
pub fn get_handlers_for<M: MessageBundle, Plugin: DoesHandle<M>>() -> HandleTypes {
HandleTypes(M::get_ids())
}
+
+ /// Empty list of types. A plugin that does not handle anything will not be able to receive
+ /// messages except through replies sent with [`Reply`](crate::address::Reply)
+ pub fn empty() -> HandleTypes {
+ HandleTypes(Vec::with_capacity(0))
+ }
}
impl From<HandleTypes> for HashSet<(&'static str, TypeId)> {
@@ -361,7 +376,9 @@ impl From<HandleTypes> for HashSet<(&'static str, TypeId)> {
///
/// This trait is a marker trait for all types that can be used as messages which can be send
/// between plugins in thin-edge.
-pub trait Message: 'static + Send {}
+pub trait Message: 'static + Send + std::fmt::Debug {
+ type Reply: Message;
+}
/// A bundle of messages
///
@@ -392,7 +409,7 @@ pub trait PluginExt: Plugin {
impl<P: Plugin> PluginExt for P {}
type PluginHandlerFn =
- for<'r> fn(&'r dyn Any, Box<dyn Any + Send>) -> BoxFuture<'r, Result<(), PluginError>>;
+ for<'r> fn(&'r dyn Any, InternalMessage) -> BoxFuture<'r, Result<(), PluginError>>;
/// A plugin that is instantiated
///
@@ -412,7 +429,7 @@ impl BuiltPlugin {
#[must_use]
pub fn handle_message(
&self,
- message: Box<dyn Any + Send>,
+ message: InternalMessage,
) -> BoxFuture<'_, Result<(), PluginError>> {
(self.handler)((&*self.plugin).as_any(), message)
}
@@ -443,7 +460,7 @@ macro_rules! impl_does_handle_tuple {
fn into_built_plugin(self) -> BuiltPlugin {
fn handle_message<'a, $cur: Message, $($rest: Message,)* PLUG: Plugin + Handle<$cur> $(+ Handle<$rest>)*>(
plugin: &'a dyn Any,
- message: Box<dyn Any + Send>,
+ message: InternalMessage,
) -> BoxFuture<'a, Result<(), PluginError>> {
let plug = match plugin.downcast_ref::<PLUG>() {
Some(p) => p,
@@ -454,14 +471,23 @@ macro_rules! impl_does_handle_tuple {
futures::FutureExt::boxed(async move {
#![allow(unused)]
+ let InternalMessage { data: message, reply_sender } = message;
+
+
let message = match message.downcast::<$cur>() {
- Ok(message) => return plug.handle_message(*message).await,
+ Ok(message) => {
+ let reply_sender = crate::address::ReplySender::new(reply_sender);
+ return plug.handle_message(*message, reply_sender).await
+ }
Err(m) => m,
};
$(
let message = match message.downcast::<$rest>() {
- Ok(message) => return plug.handle_message(*message).await,
+ Ok(message) => {
+ let reply_sender = crate::address::ReplySender::new(reply_sender);
+ return plug.handle_message(*message, reply_sender).await
+ }
Err(m) => m,
};
)*
@@ -486,6 +512,27 @@ impl<M: Message> MessageBundle for M {
}
}
+impl MessageBundle for () {
+ fn get_ids() -> Vec<(&'static str, TypeId)> {
+ vec![]
+ }
+}
+
+impl<P: Plugin> DoesHandle<()> for P {
+ fn into_built_plugin(self) -> BuiltPlugin {
+ fn handle_message<'a, PLUG: Plugin>(
+ _plugin: &'a dyn Any,
+ _message: InternalMessage,
+ ) -> BoxFuture<'a, Result<(), PluginError>> {
+ unreachable!()
+ }
+ BuiltPlugin {
+ plugin: Box::new(self),
+ handler: handle_message::<P>,
+ }
+ }
+}
+
macro_rules! impl_msg_bundle_tuple {
() => {};
(@rec_tuple $cur:ident) => {