diff options
author | Jake <me@jh.gg> | 2020-02-25 09:06:02 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-25 09:06:02 -0800 |
commit | b9cc032d3bcf7686f0130163c13d3d660d30ae2c (patch) | |
tree | 0487832235e3f19081834fcb4c5946c8d5347f1c /tokio/src/sync/mpsc | |
parent | 4213b794611d706beb0ad244a1ad03086bb8a94d (diff) |
mpsc: add `Sender::send_timeout` (#2227)
Diffstat (limited to 'tokio/src/sync/mpsc')
-rw-r--r-- | tokio/src/sync/mpsc/bounded.rs | 79 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/error.rs | 33 |
2 files changed, 111 insertions, 1 deletions
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index b95611d8..ba89fdd7 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -2,6 +2,11 @@ use crate::sync::mpsc::chan; use crate::sync::mpsc::error::{ClosedError, SendError, TryRecvError, TrySendError}; use crate::sync::semaphore_ll as semaphore; +cfg_time! { + use crate::sync::mpsc::error::SendTimeoutError; + use crate::time::Duration; +} + use std::fmt; use std::task::{Context, Poll}; @@ -322,3 +327,77 @@ impl<T> Sender<T> { } } } + +cfg_time! { + impl<T> Sender<T> { + /// Sends a value, waiting until there is capacity, but only for a limited time. + /// + /// Shares the same success and error conditions as [`send`], adding one more + /// condition for an unsuccessful send, which is when the provided timeout has + /// elapsed, and there is no capacity available. + /// + /// [`send`]: Sender::send + /// + /// # Errors + /// + /// If the receive half of the channel is closed, either due to [`close`] being + /// called or the [`Receiver`] handle dropping, or if the timeout specified + /// elapses before the capacity is available the function returns an error. + /// The error includes the value passed to `send_timeout`. + /// + /// [`close`]: Receiver::close + /// [`Receiver`]: Receiver + /// + /// # Examples + /// + /// In the following example, each call to `send_timeout` will block until the + /// previously sent value was received, unless the timeout has elapsed. + /// + /// ```rust + /// use tokio::sync::mpsc; + /// use tokio::time::{delay_for, Duration}; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx, mut rx) = mpsc::channel(1); + /// + /// tokio::spawn(async move { + /// for i in 0..10 { + /// if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await { + /// println!("send error: #{:?}", e); + /// return; + /// } + /// } + /// }); + /// + /// while let Some(i) = rx.recv().await { + /// println!("got = {}", i); + /// delay_for(Duration::from_millis(200)).await; + /// } + /// } + /// ``` + pub async fn send_timeout( + &mut self, + value: T, + timeout: Duration, + ) -> Result<(), SendTimeoutError<T>> { + use crate::future::poll_fn; + + match crate::time::timeout(timeout, poll_fn(|cx| self.poll_ready(cx))).await { + Err(_) => { + return Err(SendTimeoutError::Timeout(value)); + } + Ok(Err(_)) => { + return Err(SendTimeoutError::Closed(value)); + } + Ok(_) => {} + } + + match self.try_send(value) { + Ok(()) => Ok(()), + Err(TrySendError::Full(_)) => unreachable!(), + Err(TrySendError::Closed(value)) => Err(SendTimeoutError::Closed(value)), + } + } + } +} diff --git a/tokio/src/sync/mpsc/error.rs b/tokio/src/sync/mpsc/error.rs index 802bc488..99193563 100644 --- a/tokio/src/sync/mpsc/error.rs +++ b/tokio/src/sync/mpsc/error.rs @@ -96,7 +96,7 @@ impl Error for TryRecvError {} // ===== ClosedError ===== -/// Erorr returned by [`Sender::poll_ready`](super::Sender::poll_ready)]. +/// Error returned by [`Sender::poll_ready`](super::Sender::poll_ready)]. #[derive(Debug)] pub struct ClosedError(()); @@ -113,3 +113,34 @@ impl fmt::Display for ClosedError { } impl Error for ClosedError {} + +cfg_time! { + // ===== SendTimeoutError ===== + + #[derive(Debug)] + /// Error returned by [`Sender::send_timeout`](super::Sender::send_timeout)]. + pub enum SendTimeoutError<T> { + /// The data could not be sent on the channel because the channel is + /// full, and the timeout to send has elapsed. + Timeout(T), + + /// The receive half of the channel was explicitly closed or has been + /// dropped. + Closed(T), + } + + impl<T: fmt::Debug> Error for SendTimeoutError<T> {} + + impl<T> fmt::Display for SendTimeoutError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + fmt, + "{}", + match self { + SendTimeoutError::Timeout(..) => "timed out waiting on send operation", + SendTimeoutError::Closed(..) => "channel closed", + } + ) + } + } +} |