summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_api/src/address.rs
diff options
context:
space:
mode:
authorMarcel Müller <m.mueller@ifm.com>2022-04-29 09:30:10 +0200
committerMarcel Müller <m.mueller@ifm.com>2022-05-05 15:06:20 +0200
commitabab64c856c0c299b13fcc1857a5b78f449e4a9e (patch)
tree1dfacf02e269b25328275367f20c70369a2797db /crates/core/tedge_api/src/address.rs
parenta398c9783b54892aa1f625685beb7f153d7f7e6d (diff)
Add AnyMessage & AnyMessages
This patch adds the ability to send any kind of messages to plugins that declare handling `AnyMessage` + `AnyMessages`. It does it with these changes: - Remove the `Message::Reply` associated type - This allows `Message` to be used as a trait object - Add the `MessageType` which allows to identify messages - This permits `AnyMessage` to be received by plugins Signed-off-by: Marcel Müller <m.mueller@ifm.com>
Diffstat (limited to 'crates/core/tedge_api/src/address.rs')
-rw-r--r--crates/core/tedge_api/src/address.rs80
1 files changed, 43 insertions, 37 deletions
diff --git a/crates/core/tedge_api/src/address.rs b/crates/core/tedge_api/src/address.rs
index 619941ad..93a51450 100644
--- a/crates/core/tedge_api/src/address.rs
+++ b/crates/core/tedge_api/src/address.rs
@@ -2,16 +2,19 @@ use std::{marker::PhantomData, time::Duration};
use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
-use crate::plugin::Message;
+use crate::{
+ message::MessageType,
+ plugin::{AcceptsReplies, Message},
+};
#[doc(hidden)]
-pub type AnySendBox = Box<dyn std::any::Any + Send>;
+pub type AnyMessageBox = Box<dyn Message>;
#[doc(hidden)]
#[derive(Debug)]
pub struct InternalMessage {
- pub(crate) data: AnySendBox,
- pub(crate) reply_sender: tokio::sync::oneshot::Sender<AnySendBox>,
+ pub(crate) data: AnyMessageBox,
+ pub(crate) reply_sender: tokio::sync::oneshot::Sender<AnyMessageBox>,
}
/// THIS IS NOT PART OF THE PUBLIC API, AND MAY CHANGE AT ANY TIME
@@ -79,7 +82,7 @@ impl<RB: ReceiverBundle> Address<RB> {
/// could become an issue use something akin to timeout (like
/// [`timeout`](tokio::time::timeout)).
/// For details on sending and receiving, see `tokio::sync::mpsc::Sender`.
- pub async fn send_and_wait<M: Message>(&self, msg: M) -> Result<ReplyReceiver<M::Reply>, M>
+ pub async fn send_and_wait<M: Message>(&self, msg: M) -> Result<ReplyReceiverFor<M>, M>
where
RB: Contains<M>,
{
@@ -93,7 +96,7 @@ impl<RB: ReceiverBundle> Address<RB> {
.await
.map_err(|msg| *msg.0.data.downcast::<M>().unwrap())?;
- Ok(ReplyReceiver {
+ Ok(ReplyReceiverFor {
_pd: PhantomData,
reply_recv: receiver,
})
@@ -111,7 +114,7 @@ impl<RB: ReceiverBundle> Address<RB> {
///
/// The error is returned if the receiving side (the plugin that is addressed) cannot currently
/// receive messages (either because it is closed or the queue is full).
- pub fn try_send<M: Message>(&self, msg: M) -> Result<ReplyReceiver<M::Reply>, M> {
+ pub fn try_send<M: Message>(&self, msg: M) -> Result<ReplyReceiverFor<M>, M> {
let (sender, receiver) = tokio::sync::oneshot::channel();
self.sender
@@ -125,7 +128,7 @@ impl<RB: ReceiverBundle> Address<RB> {
}
})?;
- Ok(ReplyReceiver {
+ Ok(ReplyReceiverFor {
_pd: PhantomData,
reply_recv: receiver,
})
@@ -142,7 +145,7 @@ impl<RB: ReceiverBundle> Address<RB> {
&self,
msg: M,
timeout: Duration,
- ) -> Result<ReplyReceiver<M::Reply>, M> {
+ ) -> Result<ReplyReceiverFor<M>, M> {
let (sender, receiver) = tokio::sync::oneshot::channel();
self.sender
@@ -160,7 +163,7 @@ impl<RB: ReceiverBundle> Address<RB> {
}
})?;
- Ok(ReplyReceiver {
+ Ok(ReplyReceiverFor {
_pd: PhantomData,
reply_recv: receiver,
})
@@ -169,12 +172,12 @@ impl<RB: ReceiverBundle> Address<RB> {
#[derive(Debug)]
/// Listener that allows one to wait for a reply as sent through [`Address::send_and_wait`]
-pub struct ReplyReceiver<M> {
+pub struct ReplyReceiverFor<M> {
_pd: PhantomData<fn(M)>,
- reply_recv: tokio::sync::oneshot::Receiver<AnySendBox>,
+ reply_recv: tokio::sync::oneshot::Receiver<AnyMessageBox>,
}
-impl<M: Message> ReplyReceiver<M> {
+impl<M: Message> ReplyReceiverFor<M> {
/// Wait for a reply until for the duration given in `timeout`
///
/// ## Note
@@ -186,7 +189,11 @@ impl<M: Message> ReplyReceiver<M> {
/// It is also important, that just because a given `M: Message` has a `M::Reply` type set,
/// that the plugin that a message was sent to does _not_ have to reply with it. It can choose
/// to not do so.
- pub async fn wait_for_reply(self, timeout: Duration) -> Result<M, ReplyError> {
+ pub async fn wait_for_reply<R>(self, timeout: Duration) -> Result<R, ReplyError>
+ where
+ R: Message,
+ M: AcceptsReplies<Reply = R>,
+ {
let data = tokio::time::timeout(timeout, self.reply_recv)
.await
.map_err(|_| ReplyError::Timeout)?
@@ -199,13 +206,13 @@ impl<M: Message> ReplyReceiver<M> {
#[derive(Debug)]
/// Allows the [`Handle`](crate::plugin::Handle) implementation to reply with a given message as
/// specified by the currently handled message.
-pub struct ReplySender<M> {
+pub struct ReplySenderFor<M> {
_pd: PhantomData<fn(M)>,
- reply_sender: tokio::sync::oneshot::Sender<AnySendBox>,
+ reply_sender: tokio::sync::oneshot::Sender<AnyMessageBox>,
}
-impl<M: Message> ReplySender<M> {
- pub(crate) fn new(reply_sender: tokio::sync::oneshot::Sender<AnySendBox>) -> Self {
+impl<M: Message> ReplySenderFor<M> {
+ pub(crate) fn new(reply_sender: tokio::sync::oneshot::Sender<AnyMessageBox>) -> Self {
Self {
_pd: PhantomData,
reply_sender,
@@ -213,7 +220,11 @@ impl<M: Message> ReplySender<M> {
}
/// Reply to the originating plugin with the given message
- pub fn reply(self, msg: M) -> Result<(), M> {
+ pub fn reply<R>(self, msg: R) -> Result<(), M>
+ where
+ R: Message,
+ M: AcceptsReplies<Reply = R>,
+ {
self.reply_sender
.send(Box::new(msg))
.map_err(|msg| *msg.downcast::<M>().unwrap())
@@ -244,7 +255,7 @@ pub enum ReplyError {
#[doc(hidden)]
pub trait ReceiverBundle: Send + 'static {
- fn get_ids() -> Vec<(&'static str, std::any::TypeId)>;
+ fn get_ids() -> Vec<MessageType>;
}
#[doc(hidden)]
@@ -258,21 +269,17 @@ pub trait Contains<M: Message> {}
/// ## Example
///
/// ```rust
-/// # use tedge_api::{Message, make_receiver_bundle, message::NoReply};
+/// # use tedge_api::{Message, make_receiver_bundle};
///
/// #[derive(Debug)]
/// struct IntMessage(u8);
///
-/// impl Message for IntMessage {
-/// type Reply = NoReply;
-/// }
+/// impl Message for IntMessage { }
///
/// #[derive(Debug)]
/// struct StatusMessage(String);
///
-/// impl Message for StatusMessage {
-/// type Reply = NoReply;
-/// }
+/// impl Message for StatusMessage { }
///
/// make_receiver_bundle!(struct MessageReceiver(IntMessage, StatusMessage));
///
@@ -289,9 +296,9 @@ macro_rules! make_receiver_bundle {
impl $crate::address::ReceiverBundle for $name {
#[allow(unused_parens)]
- fn get_ids() -> Vec<(&'static str, std::any::TypeId)> {
+ fn get_ids() -> Vec<$crate::message::MessageType> {
vec![
- $((std::any::type_name::<$msg>(), std::any::TypeId::of::<$msg>())),+
+ $($crate::message::MessageType::for_message::<$msg>()),+
]
}
}
@@ -305,25 +312,24 @@ mod tests {
use static_assertions::{assert_impl_all, assert_not_impl_any};
use crate::{
- address::{ReplyReceiver, ReplySender},
+ address::{ReplyReceiverFor, ReplySenderFor},
make_receiver_bundle,
- plugin::Message,
+ plugin::{AcceptsReplies, Message},
Address,
};
#[derive(Debug)]
struct Foo;
- impl Message for Foo {
+ impl Message for Foo {}
+ impl AcceptsReplies for Foo {
type Reply = Bar;
}
#[derive(Debug)]
struct Bar;
- impl Message for Bar {
- type Reply = Bar;
- }
+ impl Message for Bar {}
make_receiver_bundle!(struct FooBar(Foo, Bar));
@@ -344,6 +350,6 @@ mod tests {
assert_impl_all!(Address<FooBar>: Clone, Send, Sync);
assert_not_impl_any!(NotSync: Send, Sync);
- assert_impl_all!(ReplySender<NotSync>: Send, Sync);
- assert_impl_all!(ReplyReceiver<NotSync>: Send, Sync);
+ assert_impl_all!(ReplySenderFor<NotSync>: Send, Sync);
+ assert_impl_all!(ReplyReceiverFor<NotSync>: Send, Sync);
}