summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMarcel Müller <m.mueller@ifm.com>2022-04-27 10:55:30 +0200
committerMarcel Müller <m.mueller@ifm.com>2022-04-27 11:05:19 +0200
commit50097a89079396f08f1302b621c7b5b3cc7f6091 (patch)
treea8f47b350c90989c00c7ff915b9d857f27f38300
parent449d2f3bba579b4d6553789d4777f0f0f2a89384 (diff)
Add send_with_timeout
Sending messages with a timeout ensures that if a plugin takes too long to accept the message this method eventually returns. Signed-off-by: Marcel Müller <m.mueller@ifm.com>
-rw-r--r--crates/core/tedge_api/src/address.rs37
1 files changed, 36 insertions, 1 deletions
diff --git a/crates/core/tedge_api/src/address.rs b/crates/core/tedge_api/src/address.rs
index 3621d833..0aaf5f86 100644
--- a/crates/core/tedge_api/src/address.rs
+++ b/crates/core/tedge_api/src/address.rs
@@ -1,6 +1,6 @@
use std::{marker::PhantomData, time::Duration};
-use tokio::sync::mpsc::error::TrySendError;
+use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
use crate::plugin::Message;
@@ -130,6 +130,41 @@ impl<RB: ReceiverBundle> Address<RB> {
reply_recv: receiver,
})
}
+
+ /// Send a message `M` to the address represented by the instance of this struct and wait for
+ /// them to accept it or timeout
+ ///
+ /// This method is identical to [`send_and_wait`] except a timeout can be specified after which
+ /// trying to send is aborted.
+ ///
+ /// If you do not wish to wait for a timeout see [`try_send`]
+ pub async fn send_with_timeout<M: Message>(
+ &self,
+ msg: M,
+ timeout: Duration,
+ ) -> Result<ReplyReceiver<M::Reply>, M> {
+ let (sender, receiver) = tokio::sync::oneshot::channel();
+
+ self.sender
+ .send_timeout(
+ InternalMessage {
+ data: Box::new(msg),
+ reply_sender: sender,
+ },
+ timeout,
+ )
+ .await
+ .map_err(|msg| match msg {
+ SendTimeoutError::Timeout(data) | SendTimeoutError::Closed(data) => {
+ *data.data.downcast::<M>().unwrap()
+ }
+ })?;
+
+ Ok(ReplyReceiver {
+ _pd: PhantomData,
+ reply_recv: receiver,
+ })
+ }
}
#[derive(Debug)]