diff options
author | Marcel Müller <m.mueller@ifm.com> | 2022-04-29 09:30:10 +0200 |
---|---|---|
committer | Marcel Müller <m.mueller@ifm.com> | 2022-05-05 15:06:20 +0200 |
commit | abab64c856c0c299b13fcc1857a5b78f449e4a9e (patch) | |
tree | 1dfacf02e269b25328275367f20c70369a2797db /crates/core/tedge_api/src/address.rs | |
parent | a398c9783b54892aa1f625685beb7f153d7f7e6d (diff) |
Add AnyMessage & AnyMessages
This patch adds the ability to send any kind of messages to plugins that
declare handling `AnyMessage` + `AnyMessages`.
It does it with these changes:
- Remove the `Message::Reply` associated type
- This allows `Message` to be used as a trait object
- Add the `MessageType` which allows to identify messages
- This permits `AnyMessage` to be received by plugins
Signed-off-by: Marcel Müller <m.mueller@ifm.com>
Diffstat (limited to 'crates/core/tedge_api/src/address.rs')
-rw-r--r-- | crates/core/tedge_api/src/address.rs | 80 |
1 files changed, 43 insertions, 37 deletions
diff --git a/crates/core/tedge_api/src/address.rs b/crates/core/tedge_api/src/address.rs index 619941ad..93a51450 100644 --- a/crates/core/tedge_api/src/address.rs +++ b/crates/core/tedge_api/src/address.rs @@ -2,16 +2,19 @@ use std::{marker::PhantomData, time::Duration}; use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError}; -use crate::plugin::Message; +use crate::{ + message::MessageType, + plugin::{AcceptsReplies, Message}, +}; #[doc(hidden)] -pub type AnySendBox = Box<dyn std::any::Any + Send>; +pub type AnyMessageBox = Box<dyn Message>; #[doc(hidden)] #[derive(Debug)] pub struct InternalMessage { - pub(crate) data: AnySendBox, - pub(crate) reply_sender: tokio::sync::oneshot::Sender<AnySendBox>, + pub(crate) data: AnyMessageBox, + pub(crate) reply_sender: tokio::sync::oneshot::Sender<AnyMessageBox>, } /// THIS IS NOT PART OF THE PUBLIC API, AND MAY CHANGE AT ANY TIME @@ -79,7 +82,7 @@ impl<RB: ReceiverBundle> Address<RB> { /// could become an issue use something akin to timeout (like /// [`timeout`](tokio::time::timeout)). /// For details on sending and receiving, see `tokio::sync::mpsc::Sender`. - pub async fn send_and_wait<M: Message>(&self, msg: M) -> Result<ReplyReceiver<M::Reply>, M> + pub async fn send_and_wait<M: Message>(&self, msg: M) -> Result<ReplyReceiverFor<M>, M> where RB: Contains<M>, { @@ -93,7 +96,7 @@ impl<RB: ReceiverBundle> Address<RB> { .await .map_err(|msg| *msg.0.data.downcast::<M>().unwrap())?; - Ok(ReplyReceiver { + Ok(ReplyReceiverFor { _pd: PhantomData, reply_recv: receiver, }) @@ -111,7 +114,7 @@ impl<RB: ReceiverBundle> Address<RB> { /// /// The error is returned if the receiving side (the plugin that is addressed) cannot currently /// receive messages (either because it is closed or the queue is full). - pub fn try_send<M: Message>(&self, msg: M) -> Result<ReplyReceiver<M::Reply>, M> { + pub fn try_send<M: Message>(&self, msg: M) -> Result<ReplyReceiverFor<M>, M> { let (sender, receiver) = tokio::sync::oneshot::channel(); self.sender @@ -125,7 +128,7 @@ impl<RB: ReceiverBundle> Address<RB> { } })?; - Ok(ReplyReceiver { + Ok(ReplyReceiverFor { _pd: PhantomData, reply_recv: receiver, }) @@ -142,7 +145,7 @@ impl<RB: ReceiverBundle> Address<RB> { &self, msg: M, timeout: Duration, - ) -> Result<ReplyReceiver<M::Reply>, M> { + ) -> Result<ReplyReceiverFor<M>, M> { let (sender, receiver) = tokio::sync::oneshot::channel(); self.sender @@ -160,7 +163,7 @@ impl<RB: ReceiverBundle> Address<RB> { } })?; - Ok(ReplyReceiver { + Ok(ReplyReceiverFor { _pd: PhantomData, reply_recv: receiver, }) @@ -169,12 +172,12 @@ impl<RB: ReceiverBundle> Address<RB> { #[derive(Debug)] /// Listener that allows one to wait for a reply as sent through [`Address::send_and_wait`] -pub struct ReplyReceiver<M> { +pub struct ReplyReceiverFor<M> { _pd: PhantomData<fn(M)>, - reply_recv: tokio::sync::oneshot::Receiver<AnySendBox>, + reply_recv: tokio::sync::oneshot::Receiver<AnyMessageBox>, } -impl<M: Message> ReplyReceiver<M> { +impl<M: Message> ReplyReceiverFor<M> { /// Wait for a reply until for the duration given in `timeout` /// /// ## Note @@ -186,7 +189,11 @@ impl<M: Message> ReplyReceiver<M> { /// It is also important, that just because a given `M: Message` has a `M::Reply` type set, /// that the plugin that a message was sent to does _not_ have to reply with it. It can choose /// to not do so. - pub async fn wait_for_reply(self, timeout: Duration) -> Result<M, ReplyError> { + pub async fn wait_for_reply<R>(self, timeout: Duration) -> Result<R, ReplyError> + where + R: Message, + M: AcceptsReplies<Reply = R>, + { let data = tokio::time::timeout(timeout, self.reply_recv) .await .map_err(|_| ReplyError::Timeout)? @@ -199,13 +206,13 @@ impl<M: Message> ReplyReceiver<M> { #[derive(Debug)] /// Allows the [`Handle`](crate::plugin::Handle) implementation to reply with a given message as /// specified by the currently handled message. -pub struct ReplySender<M> { +pub struct ReplySenderFor<M> { _pd: PhantomData<fn(M)>, - reply_sender: tokio::sync::oneshot::Sender<AnySendBox>, + reply_sender: tokio::sync::oneshot::Sender<AnyMessageBox>, } -impl<M: Message> ReplySender<M> { - pub(crate) fn new(reply_sender: tokio::sync::oneshot::Sender<AnySendBox>) -> Self { +impl<M: Message> ReplySenderFor<M> { + pub(crate) fn new(reply_sender: tokio::sync::oneshot::Sender<AnyMessageBox>) -> Self { Self { _pd: PhantomData, reply_sender, @@ -213,7 +220,11 @@ impl<M: Message> ReplySender<M> { } /// Reply to the originating plugin with the given message - pub fn reply(self, msg: M) -> Result<(), M> { + pub fn reply<R>(self, msg: R) -> Result<(), M> + where + R: Message, + M: AcceptsReplies<Reply = R>, + { self.reply_sender .send(Box::new(msg)) .map_err(|msg| *msg.downcast::<M>().unwrap()) @@ -244,7 +255,7 @@ pub enum ReplyError { #[doc(hidden)] pub trait ReceiverBundle: Send + 'static { - fn get_ids() -> Vec<(&'static str, std::any::TypeId)>; + fn get_ids() -> Vec<MessageType>; } #[doc(hidden)] @@ -258,21 +269,17 @@ pub trait Contains<M: Message> {} /// ## Example /// /// ```rust -/// # use tedge_api::{Message, make_receiver_bundle, message::NoReply}; +/// # use tedge_api::{Message, make_receiver_bundle}; /// /// #[derive(Debug)] /// struct IntMessage(u8); /// -/// impl Message for IntMessage { -/// type Reply = NoReply; -/// } +/// impl Message for IntMessage { } /// /// #[derive(Debug)] /// struct StatusMessage(String); /// -/// impl Message for StatusMessage { -/// type Reply = NoReply; -/// } +/// impl Message for StatusMessage { } /// /// make_receiver_bundle!(struct MessageReceiver(IntMessage, StatusMessage)); /// @@ -289,9 +296,9 @@ macro_rules! make_receiver_bundle { impl $crate::address::ReceiverBundle for $name { #[allow(unused_parens)] - fn get_ids() -> Vec<(&'static str, std::any::TypeId)> { + fn get_ids() -> Vec<$crate::message::MessageType> { vec![ - $((std::any::type_name::<$msg>(), std::any::TypeId::of::<$msg>())),+ + $($crate::message::MessageType::for_message::<$msg>()),+ ] } } @@ -305,25 +312,24 @@ mod tests { use static_assertions::{assert_impl_all, assert_not_impl_any}; use crate::{ - address::{ReplyReceiver, ReplySender}, + address::{ReplyReceiverFor, ReplySenderFor}, make_receiver_bundle, - plugin::Message, + plugin::{AcceptsReplies, Message}, Address, }; #[derive(Debug)] struct Foo; - impl Message for Foo { + impl Message for Foo {} + impl AcceptsReplies for Foo { type Reply = Bar; } #[derive(Debug)] struct Bar; - impl Message for Bar { - type Reply = Bar; - } + impl Message for Bar {} make_receiver_bundle!(struct FooBar(Foo, Bar)); @@ -344,6 +350,6 @@ mod tests { assert_impl_all!(Address<FooBar>: Clone, Send, Sync); assert_not_impl_any!(NotSync: Send, Sync); - assert_impl_all!(ReplySender<NotSync>: Send, Sync); - assert_impl_all!(ReplyReceiver<NotSync>: Send, Sync); + assert_impl_all!(ReplySenderFor<NotSync>: Send, Sync); + assert_impl_all!(ReplyReceiverFor<NotSync>: Send, Sync); } |