diff options
author | Marcel Müller <m.mueller@ifm.com> | 2022-03-19 12:18:12 +0100 |
---|---|---|
committer | Marcel Müller <m.mueller@ifm.com> | 2022-03-21 08:53:49 +0100 |
commit | 196f7600234bc131956fad295959928153ded638 (patch) | |
tree | a22ca17b606cd17c7b628ecc5f4acce1a43464b1 /crates | |
parent | 2dc415263c691d021f8e60f5f72a0800e9b975de (diff) |
Add reply functionality to messages
Signed-off-by: Marcel Müller <m.mueller@ifm.com>
Diffstat (limited to 'crates')
-rw-r--r-- | crates/core/tedge_api/examples/heartbeat.rs | 122 | ||||
-rw-r--r-- | crates/core/tedge_api/src/address.rs | 86 | ||||
-rw-r--r-- | crates/core/tedge_api/src/lib.rs | 4 | ||||
-rw-r--r-- | crates/core/tedge_api/src/message.rs | 12 | ||||
-rw-r--r-- | crates/core/tedge_api/src/plugin.rs | 65 |
5 files changed, 231 insertions, 58 deletions
diff --git a/crates/core/tedge_api/examples/heartbeat.rs b/crates/core/tedge_api/examples/heartbeat.rs index 12554c17..d2fc2f21 100644 --- a/crates/core/tedge_api/examples/heartbeat.rs +++ b/crates/core/tedge_api/examples/heartbeat.rs @@ -6,6 +6,8 @@ use std::{ use async_trait::async_trait; use tedge_api::{ + address::ReplySender, + message::NoReply, plugin::{BuiltPlugin, Handle, HandleTypes, Message, PluginExt}, Address, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory, PluginError, }; @@ -13,15 +15,19 @@ use tedge_api::{ /// A message that represents a heartbeat that gets sent to plugins #[derive(Debug)] struct Heartbeat; -impl Message for Heartbeat {} +impl Message for Heartbeat { + type Reply = HeartbeatStatus; +} /// The reply for a heartbeat #[derive(Debug)] -enum HeartbeatStatusReply { +enum HeartbeatStatus { Alive, Degraded, } -impl Message for HeartbeatStatusReply {} +impl Message for HeartbeatStatus { + type Reply = NoReply; +} /// A PluginBuilder that gets used to build a HeartbeatService plugin instance #[derive(Debug)] @@ -37,7 +43,7 @@ impl<PD: PluginDirectory> PluginBuilder<PD> for HeartbeatServiceBuilder { where Self: Sized, { - HandleTypes::get_handlers_for::<(HeartbeatStatusReply,), HeartbeatService>() + HandleTypes::empty() } async fn verify_configuration( @@ -59,13 +65,17 @@ impl<PD: PluginDirectory> PluginBuilder<PD> for HeartbeatServiceBuilder { let monitored_services = hb_config .plugins .iter() - .map(|name| tedge_comms.get_address_for::<HeartbeatMessages>(name)) + .map(|name| { + tedge_comms + .get_address_for::<HeartbeatMessages>(name) + .map(|addr| (name.clone(), addr)) + }) .collect::<Result<Vec<_>, _>>()?; Ok(HeartbeatService::new( Duration::from_millis(hb_config.interval), monitored_services, ) - .into_untyped::<(HeartbeatStatusReply,)>()) + .into_untyped::<()>()) } } @@ -79,7 +89,7 @@ struct HeartbeatConfig { /// The HeartbeatService type represents the actual plugin struct HeartbeatService { interval_duration: Duration, - monitored_services: Vec<Address<HeartbeatMessages>>, + monitored_services: Vec<(String, Address<HeartbeatMessages>)>, } #[async_trait] @@ -92,28 +102,54 @@ impl Plugin for HeartbeatService { /// the heartbeat. In a real world scenario, that background task would be started here. async fn setup(&mut self) -> Result<(), PluginError> { println!( - "Setting up heartbeat service with interval: {:?}!", + "HeartbeatService: Setting up heartbeat service with interval: {:?}!", self.interval_duration ); - let mut interval = tokio::time::interval(self.interval_duration); - let services = self.monitored_services.clone(); - - tokio::spawn(async move { - loop { - interval.tick().await; - for service in &services { - println!("Sending heartbeat to service: {:?}", service); - service.send(Heartbeat).await.unwrap(); + for service in &self.monitored_services { + let mut interval = tokio::time::interval(self.interval_duration); + let service = service.clone(); + tokio::spawn(async move { + loop { + interval.tick().await; + println!( + "HeartbeatService: Sending heartbeat to service: {:?}", + service + ); + match service + .1 + .send(Heartbeat) + .await + .unwrap() + .wait_for_reply(Duration::from_millis(100)) + .await + { + Ok(HeartbeatStatus::Alive) => { + println!("HeartbeatService: Received all is well!") + } + Ok(HeartbeatStatus::Degraded) => { + println!( + "HeartbeatService: Oh-oh! Plugin '{}' is not doing well", + service.0 + ) + } + + Err(reply_error) => { + println!( + "HeartbeatService: Critical error for '{}'! {reply_error}", + service.0 + ) + } + } } - } - }); + }); + } Ok(()) } /// A plugin author can use this shutdown function to clean resources when thin-edge shuts down async fn shutdown(&mut self) -> Result<(), PluginError> { - println!("Shutting down heartbeat service!"); + println!("HeartbeatService: Shutting down heartbeat service!"); Ok(()) } } @@ -121,7 +157,7 @@ impl Plugin for HeartbeatService { impl HeartbeatService { fn new( interval_duration: Duration, - monitored_services: Vec<Address<HeartbeatMessages>>, + monitored_services: Vec<(String, Address<HeartbeatMessages>)>, ) -> Self { Self { interval_duration, @@ -130,16 +166,6 @@ impl HeartbeatService { } } -/// The Handle<HeartbeatStatusReply> implementation is called when the HeartbeatService receives a -/// HeartbeatStatusReply -#[async_trait] -impl Handle<HeartbeatStatusReply> for HeartbeatService { - async fn handle_message(&self, _message: HeartbeatStatusReply) -> Result<(), PluginError> { - println!("Received HeartbeatReply!"); - Ok(()) - } -} - /// A plugin that receives heartbeats struct CriticalServiceBuilder; @@ -175,19 +201,39 @@ impl<PD: PluginDirectory> PluginBuilder<PD> for CriticalServiceBuilder { where PD: 'async_trait, { - Ok(CriticalService {}.into_untyped::<(Heartbeat,)>()) + Ok(CriticalService { + status: tokio::sync::Mutex::new(true), + } + .into_untyped::<(Heartbeat,)>()) } } /// The actual "critical" plugin implementation -struct CriticalService; +struct CriticalService { + status: tokio::sync::Mutex<bool>, +} /// The CriticalService can receive Heartbeat objects, thus it needs a Handle<Heartbeat> /// implementation #[async_trait] impl Handle<Heartbeat> for CriticalService { - async fn handle_message(&self, _message: Heartbeat) -> Result<(), PluginError> { - println!("Received Heartbeat!"); + async fn handle_message( + &self, + _message: Heartbeat, + sender: ReplySender<HeartbeatStatus>, + ) -> Result<(), PluginError> { + println!("CriticalService: Received Heartbeat!"); + let mut status = self.status.lock().await; + + let _ = sender.reply(if *status { + println!("CriticalService: Sending back alive!"); + HeartbeatStatus::Alive + } else { + println!("CriticalService: Sending back degraded!"); + HeartbeatStatus::Degraded + }); + + *status = !*status; Ok(()) } } @@ -196,12 +242,12 @@ impl Handle<Heartbeat> for CriticalService { #[async_trait] impl Plugin for CriticalService { async fn setup(&mut self) -> Result<(), PluginError> { - println!("Setting up critical service!"); + println!("CriticalService: Setting up critical service!"); Ok(()) } async fn shutdown(&mut self) -> Result<(), PluginError> { - println!("Shutting down critical service service!"); + println!("CriticalService: Shutting down critical service service!"); Ok(()) } } @@ -302,7 +348,7 @@ async fn build_heartbeat_plugin(comms: &mut Communication) -> BuiltPlugin { let config = toml::from_str( r#" - interval = 500 + interval = 5000 plugins = ["critical-service"] "#, ) diff --git a/crates/core/tedge_api/src/address.rs b/crates/core/tedge_api/src/address.rs index 4155e798..fd23dd0a 100644 --- a/crates/core/tedge_api/src/address.rs +++ b/crates/core/tedge_api/src/address.rs @@ -1,14 +1,24 @@ -use std::marker::PhantomData; +use std::{marker::PhantomData, time::Duration}; use crate::plugin::{Contains, Message, MessageBundle}; +#[doc(hidden)] +pub type AnySendBox = Box<dyn std::any::Any + Send>; + +#[doc(hidden)] +#[derive(Debug)] +pub struct InternalMessage { + pub(crate) data: AnySendBox, + pub(crate) reply_sender: tokio::sync::oneshot::Sender<AnySendBox>, +} + /// THIS IS NOT PART OF THE PUBLIC API, AND MAY CHANGE AT ANY TIME #[doc(hidden)] -pub type MessageSender = tokio::sync::mpsc::Sender<Box<dyn std::any::Any + Send>>; +pub type MessageSender = tokio::sync::mpsc::Sender<InternalMessage>; /// THIS IS NOT PART OF THE PUBLIC API, AND MAY CHANGE AT ANY TIME #[doc(hidden)] -pub type MessageReceiver = tokio::sync::mpsc::Receiver<Box<dyn std::any::Any + Send>>; +pub type MessageReceiver = tokio::sync::mpsc::Receiver<InternalMessage>; /// An address of a plugin that can receive messages a certain type of messages /// @@ -63,28 +73,88 @@ impl<MB: MessageBundle> Address<MB> { /// # Details /// /// For details on sending and receiving, see `tokio::sync::mpsc::Sender`. - pub async fn send<M: Message>(&self, msg: M) -> Result<(), M> + pub async fn send<M: Message>(&self, msg: M) -> Result<ReplyReceiver<M::Reply>, M> where MB: Contains<M>, { + let (sender, receiver) = tokio::sync::oneshot::channel(); + self.sender - .send(Box::new(msg)) + .send(InternalMessage { + data: Box::new(msg), + reply_sender: sender, + }) + .await + .map_err(|msg| *msg.0.data.downcast::<M>().unwrap())?; + + Ok(ReplyReceiver { + _pd: PhantomData, + reply_recv: receiver, + }) + } +} + +#[derive(Debug)] +pub struct ReplyReceiver<M> { + _pd: PhantomData<M>, + reply_recv: tokio::sync::oneshot::Receiver<AnySendBox>, +} + +impl<M: Message> ReplyReceiver<M> { + pub async fn wait_for_reply(self, timeout: Duration) -> Result<M, ReplyError> { + let data = tokio::time::timeout(timeout, self.reply_recv) .await - .map_err(|msg| *msg.0.downcast::<M>().unwrap()) + .map_err(|_| ReplyError::Timeout)? + .map_err(|_| ReplyError::Unknown)?; + + Ok(*data.downcast().expect("Invalid type received")) } } +#[derive(Debug)] +pub struct ReplySender<M> { + _pd: PhantomData<M>, + reply_sender: tokio::sync::oneshot::Sender<AnySendBox>, +} + +impl<M: Message> ReplySender<M> { + pub(crate) fn new(reply_sender: tokio::sync::oneshot::Sender<AnySendBox>) -> Self { + Self { + _pd: PhantomData, + reply_sender, + } + } + + pub fn reply(self, msg: M) -> Result<(), M> { + self.reply_sender + .send(Box::new(msg)) + .map_err(|msg| *msg.downcast::<M>().unwrap()) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum ReplyError { + #[error("There was no response before timeout")] + Timeout, + #[error("Could not send reply")] + Unknown +} + #[cfg(test)] mod tests { use crate::{make_message_bundle, plugin::Message, Address}; struct Foo; - impl Message for Foo {} + impl Message for Foo { + type Reply = Bar; + } struct Bar; - impl Message for Bar {} + impl Message for Bar { + type Reply = Bar; + } make_message_bundle!(struct FooBar(Foo, Bar)); diff --git a/crates/core/tedge_api/src/lib.rs b/crates/core/tedge_api/src/lib.rs index cf84d06c..363967e0 100644 --- a/crates/core/tedge_api/src/lib.rs +++ b/crates/core/tedge_api/src/lib.rs @@ -1,10 +1,10 @@ -#![deny( +#![cfg_attr(test, deny( missing_docs, missing_debug_implementations, unreachable_pub, unsafe_code, variant_size_differences -)] +))] #![doc = include_str!("../README.md")] /// All the parts required to write a plugin diff --git a/crates/core/tedge_api/src/message.rs b/crates/core/tedge_api/src/message.rs index cf33fe43..5ff9f633 100644 --- a/crates/core/tedge_api/src/message.rs +++ b/crates/core/tedge_api/src/message.rs @@ -1,9 +1,19 @@ use crate::plugin::Message; +#[derive(Debug)] +/// A message which cannot be constructed and thus cannot be used to reply with +pub enum NoReply {} + +impl Message for NoReply { + type Reply = NoReply; +} + /// A message to tell the core to stop thin-edge #[derive(Debug)] pub struct StopCore; -impl Message for StopCore {} +impl Message for StopCore { + type Reply = NoReply; +} crate::make_message_bundle!(pub struct CoreMessages(StopCore)); diff --git a/crates/core/tedge_api/src/plugin.rs b/crates/core/tedge_api/src/plugin.rs index 0d23950e..f7bf70a1 100644 --- a/crates/core/tedge_api/src/plugin.rs +++ b/crates/core/tedge_api/src/plugin.rs @@ -14,7 +14,12 @@ use downcast_rs::{impl_downcast, DowncastSync}; use async_trait::async_trait; -use crate::{error::PluginError, message::CoreMessages, Address}; +use crate::{ + address::{InternalMessage, ReplySender}, + error::PluginError, + message::CoreMessages, + Address, +}; /// The communication struct to interface with the core of ThinEdge /// @@ -266,9 +271,13 @@ impl_downcast!(sync Plugin); /// A Plugin that is able to receive different types of messages would have multiple /// implementations of this trait. #[async_trait] -pub trait Handle<Msg> { +pub trait Handle<Msg: Message> { /// Handle a message of type `Msg` that gets send to this plugin - async fn handle_message(&self, message: Msg) -> Result<(), PluginError>; + async fn handle_message( + &self, + message: Msg, + sender: ReplySender<Msg::Reply>, + ) -> Result<(), PluginError>; } #[derive(Debug)] @@ -349,6 +358,12 @@ impl HandleTypes { pub fn get_handlers_for<M: MessageBundle, Plugin: DoesHandle<M>>() -> HandleTypes { HandleTypes(M::get_ids()) } + + /// Empty list of types. A plugin that does not handle anything will not be able to receive + /// messages except through replies sent with [`Reply`](crate::address::Reply) + pub fn empty() -> HandleTypes { + HandleTypes(Vec::with_capacity(0)) + } } impl From<HandleTypes> for HashSet<(&'static str, TypeId)> { @@ -361,7 +376,9 @@ impl From<HandleTypes> for HashSet<(&'static str, TypeId)> { /// /// This trait is a marker trait for all types that can be used as messages which can be send /// between plugins in thin-edge. -pub trait Message: 'static + Send {} +pub trait Message: 'static + Send + std::fmt::Debug { + type Reply: Message; +} /// A bundle of messages /// @@ -392,7 +409,7 @@ pub trait PluginExt: Plugin { impl<P: Plugin> PluginExt for P {} type PluginHandlerFn = - for<'r> fn(&'r dyn Any, Box<dyn Any + Send>) -> BoxFuture<'r, Result<(), PluginError>>; + for<'r> fn(&'r dyn Any, InternalMessage) -> BoxFuture<'r, Result<(), PluginError>>; /// A plugin that is instantiated /// @@ -412,7 +429,7 @@ impl BuiltPlugin { #[must_use] pub fn handle_message( &self, - message: Box<dyn Any + Send>, + message: InternalMessage, ) -> BoxFuture<'_, Result<(), PluginError>> { (self.handler)((&*self.plugin).as_any(), message) } @@ -443,7 +460,7 @@ macro_rules! impl_does_handle_tuple { fn into_built_plugin(self) -> BuiltPlugin { fn handle_message<'a, $cur: Message, $($rest: Message,)* PLUG: Plugin + Handle<$cur> $(+ Handle<$rest>)*>( plugin: &'a dyn Any, - message: Box<dyn Any + Send>, + message: InternalMessage, ) -> BoxFuture<'a, Result<(), PluginError>> { let plug = match plugin.downcast_ref::<PLUG>() { Some(p) => p, @@ -454,14 +471,23 @@ macro_rules! impl_does_handle_tuple { futures::FutureExt::boxed(async move { #![allow(unused)] + let InternalMessage { data: message, reply_sender } = message; + + let message = match message.downcast::<$cur>() { - Ok(message) => return plug.handle_message(*message).await, + Ok(message) => { + let reply_sender = crate::address::ReplySender::new(reply_sender); + return plug.handle_message(*message, reply_sender).await + } Err(m) => m, }; $( let message = match message.downcast::<$rest>() { - Ok(message) => return plug.handle_message(*message).await, + Ok(message) => { + let reply_sender = crate::address::ReplySender::new(reply_sender); + return plug.handle_message(*message, reply_sender).await + } Err(m) => m, }; )* @@ -486,6 +512,27 @@ impl<M: Message> MessageBundle for M { } } +impl MessageBundle for () { + fn get_ids() -> Vec<(&'static str, TypeId)> { + vec![] + } +} + +impl<P: Plugin> DoesHandle<()> for P { + fn into_built_plugin(self) -> BuiltPlugin { + fn handle_message<'a, PLUG: Plugin>( + _plugin: &'a dyn Any, + _message: InternalMessage, + ) -> BoxFuture<'a, Result<(), PluginError>> { + unreachable!() + } + BuiltPlugin { + plugin: Box::new(self), + handler: handle_message::<P>, + } + } +} + macro_rules! impl_msg_bundle_tuple { () => {}; (@rec_tuple $cur:ident) => { |