diff options
author | Marcel Müller <m.mueller@ifm.com> | 2022-04-27 10:55:30 +0200 |
---|---|---|
committer | Marcel Müller <m.mueller@ifm.com> | 2022-04-27 11:05:19 +0200 |
commit | 50097a89079396f08f1302b621c7b5b3cc7f6091 (patch) | |
tree | a8f47b350c90989c00c7ff915b9d857f27f38300 | |
parent | 449d2f3bba579b4d6553789d4777f0f0f2a89384 (diff) |
Add send_with_timeout
Sending messages with a timeout ensures that if a plugin takes too long
to accept the message this method eventually returns.
Signed-off-by: Marcel Müller <m.mueller@ifm.com>
-rw-r--r-- | crates/core/tedge_api/src/address.rs | 37 |
1 files changed, 36 insertions, 1 deletions
diff --git a/crates/core/tedge_api/src/address.rs b/crates/core/tedge_api/src/address.rs index 3621d833..0aaf5f86 100644 --- a/crates/core/tedge_api/src/address.rs +++ b/crates/core/tedge_api/src/address.rs @@ -1,6 +1,6 @@ use std::{marker::PhantomData, time::Duration}; -use tokio::sync::mpsc::error::TrySendError; +use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError}; use crate::plugin::Message; @@ -130,6 +130,41 @@ impl<RB: ReceiverBundle> Address<RB> { reply_recv: receiver, }) } + + /// Send a message `M` to the address represented by the instance of this struct and wait for + /// them to accept it or timeout + /// + /// This method is identical to [`send_and_wait`] except a timeout can be specified after which + /// trying to send is aborted. + /// + /// If you do not wish to wait for a timeout see [`try_send`] + pub async fn send_with_timeout<M: Message>( + &self, + msg: M, + timeout: Duration, + ) -> Result<ReplyReceiver<M::Reply>, M> { + let (sender, receiver) = tokio::sync::oneshot::channel(); + + self.sender + .send_timeout( + InternalMessage { + data: Box::new(msg), + reply_sender: sender, + }, + timeout, + ) + .await + .map_err(|msg| match msg { + SendTimeoutError::Timeout(data) | SendTimeoutError::Closed(data) => { + *data.data.downcast::<M>().unwrap() + } + })?; + + Ok(ReplyReceiver { + _pd: PhantomData, + reply_recv: receiver, + }) + } } #[derive(Debug)] |