diff options
author | Marcel Müller <m.mueller@ifm.com> | 2022-04-27 10:38:56 +0200 |
---|---|---|
committer | Marcel Müller <m.mueller@ifm.com> | 2022-04-27 11:05:19 +0200 |
commit | 449d2f3bba579b4d6553789d4777f0f0f2a89384 (patch) | |
tree | a67956be74f7b8f4429cac943a8d77b22f85a3e9 /crates | |
parent | 86530000d17d0727d20423fae01226201c2a1aa5 (diff) |
Add try_send method
This method is useful for plugins that do not wish to get caught in
potentially await forever if the receiving plugin does not handle the
messages it is sent.
Signed-off-by: Marcel Müller <m.mueller@ifm.com>
Diffstat (limited to 'crates')
-rw-r--r-- | crates/core/tedge_api/src/address.rs | 34 |
1 files changed, 34 insertions, 0 deletions
diff --git a/crates/core/tedge_api/src/address.rs b/crates/core/tedge_api/src/address.rs index 071de23f..3621d833 100644 --- a/crates/core/tedge_api/src/address.rs +++ b/crates/core/tedge_api/src/address.rs @@ -1,5 +1,7 @@ use std::{marker::PhantomData, time::Duration}; +use tokio::sync::mpsc::error::TrySendError; + use crate::plugin::Message; #[doc(hidden)] @@ -96,6 +98,38 @@ impl<RB: ReceiverBundle> Address<RB> { reply_recv: receiver, }) } + + /// Try sending a message `M` to the plugin behind this address without potentially waiting + /// + /// This function should be used when waiting for the plugin to receive the message is not + /// required. + /// + /// # Return + /// + /// The function either returns `Ok(())` if sending the message succeeded, + /// or the message in the error variant of the `Result`: `Err(M)`. + /// + /// The error is returned if the receiving side (the plugin that is addressed) cannot currently + /// receive messages (either because it is closed or the queue is full). + pub fn try_send<M: Message>(&self, msg: M) -> Result<ReplyReceiver<M::Reply>, M> { + let (sender, receiver) = tokio::sync::oneshot::channel(); + + self.sender + .try_send(InternalMessage { + data: Box::new(msg), + reply_sender: sender, + }) + .map_err(|msg| match msg { + TrySendError::Full(data) | TrySendError::Closed(data) => { + *data.data.downcast::<M>().unwrap() + } + })?; + + Ok(ReplyReceiver { + _pd: PhantomData, + reply_recv: receiver, + }) + } } #[derive(Debug)] |