summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/mpsc
diff options
context:
space:
mode:
authorJake <me@jh.gg>2020-02-25 09:06:02 -0800
committerGitHub <noreply@github.com>2020-02-25 09:06:02 -0800
commitb9cc032d3bcf7686f0130163c13d3d660d30ae2c (patch)
tree0487832235e3f19081834fcb4c5946c8d5347f1c /tokio/src/sync/mpsc
parent4213b794611d706beb0ad244a1ad03086bb8a94d (diff)
mpsc: add `Sender::send_timeout` (#2227)
Diffstat (limited to 'tokio/src/sync/mpsc')
-rw-r--r--tokio/src/sync/mpsc/bounded.rs79
-rw-r--r--tokio/src/sync/mpsc/error.rs33
2 files changed, 111 insertions, 1 deletions
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs
index b95611d8..ba89fdd7 100644
--- a/tokio/src/sync/mpsc/bounded.rs
+++ b/tokio/src/sync/mpsc/bounded.rs
@@ -2,6 +2,11 @@ use crate::sync::mpsc::chan;
use crate::sync::mpsc::error::{ClosedError, SendError, TryRecvError, TrySendError};
use crate::sync::semaphore_ll as semaphore;
+cfg_time! {
+ use crate::sync::mpsc::error::SendTimeoutError;
+ use crate::time::Duration;
+}
+
use std::fmt;
use std::task::{Context, Poll};
@@ -322,3 +327,77 @@ impl<T> Sender<T> {
}
}
}
+
+cfg_time! {
+ impl<T> Sender<T> {
+ /// Sends a value, waiting until there is capacity, but only for a limited time.
+ ///
+ /// Shares the same success and error conditions as [`send`], adding one more
+ /// condition for an unsuccessful send, which is when the provided timeout has
+ /// elapsed, and there is no capacity available.
+ ///
+ /// [`send`]: Sender::send
+ ///
+ /// # Errors
+ ///
+ /// If the receive half of the channel is closed, either due to [`close`] being
+ /// called or the [`Receiver`] handle dropping, or if the timeout specified
+ /// elapses before the capacity is available the function returns an error.
+ /// The error includes the value passed to `send_timeout`.
+ ///
+ /// [`close`]: Receiver::close
+ /// [`Receiver`]: Receiver
+ ///
+ /// # Examples
+ ///
+ /// In the following example, each call to `send_timeout` will block until the
+ /// previously sent value was received, unless the timeout has elapsed.
+ ///
+ /// ```rust
+ /// use tokio::sync::mpsc;
+ /// use tokio::time::{delay_for, Duration};
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx, mut rx) = mpsc::channel(1);
+ ///
+ /// tokio::spawn(async move {
+ /// for i in 0..10 {
+ /// if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
+ /// println!("send error: #{:?}", e);
+ /// return;
+ /// }
+ /// }
+ /// });
+ ///
+ /// while let Some(i) = rx.recv().await {
+ /// println!("got = {}", i);
+ /// delay_for(Duration::from_millis(200)).await;
+ /// }
+ /// }
+ /// ```
+ pub async fn send_timeout(
+ &mut self,
+ value: T,
+ timeout: Duration,
+ ) -> Result<(), SendTimeoutError<T>> {
+ use crate::future::poll_fn;
+
+ match crate::time::timeout(timeout, poll_fn(|cx| self.poll_ready(cx))).await {
+ Err(_) => {
+ return Err(SendTimeoutError::Timeout(value));
+ }
+ Ok(Err(_)) => {
+ return Err(SendTimeoutError::Closed(value));
+ }
+ Ok(_) => {}
+ }
+
+ match self.try_send(value) {
+ Ok(()) => Ok(()),
+ Err(TrySendError::Full(_)) => unreachable!(),
+ Err(TrySendError::Closed(value)) => Err(SendTimeoutError::Closed(value)),
+ }
+ }
+ }
+}
diff --git a/tokio/src/sync/mpsc/error.rs b/tokio/src/sync/mpsc/error.rs
index 802bc488..99193563 100644
--- a/tokio/src/sync/mpsc/error.rs
+++ b/tokio/src/sync/mpsc/error.rs
@@ -96,7 +96,7 @@ impl Error for TryRecvError {}
// ===== ClosedError =====
-/// Erorr returned by [`Sender::poll_ready`](super::Sender::poll_ready)].
+/// Error returned by [`Sender::poll_ready`](super::Sender::poll_ready)].
#[derive(Debug)]
pub struct ClosedError(());
@@ -113,3 +113,34 @@ impl fmt::Display for ClosedError {
}
impl Error for ClosedError {}
+
+cfg_time! {
+ // ===== SendTimeoutError =====
+
+ #[derive(Debug)]
+ /// Error returned by [`Sender::send_timeout`](super::Sender::send_timeout)].
+ pub enum SendTimeoutError<T> {
+ /// The data could not be sent on the channel because the channel is
+ /// full, and the timeout to send has elapsed.
+ Timeout(T),
+
+ /// The receive half of the channel was explicitly closed or has been
+ /// dropped.
+ Closed(T),
+ }
+
+ impl<T: fmt::Debug> Error for SendTimeoutError<T> {}
+
+ impl<T> fmt::Display for SendTimeoutError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(
+ fmt,
+ "{}",
+ match self {
+ SendTimeoutError::Timeout(..) => "timed out waiting on send operation",
+ SendTimeoutError::Closed(..) => "channel closed",
+ }
+ )
+ }
+ }
+}