diff options
author | Jake <me@jh.gg> | 2020-02-25 09:06:02 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-25 09:06:02 -0800 |
commit | b9cc032d3bcf7686f0130163c13d3d660d30ae2c (patch) | |
tree | 0487832235e3f19081834fcb4c5946c8d5347f1c /tokio/src/sync/mpsc/bounded.rs | |
parent | 4213b794611d706beb0ad244a1ad03086bb8a94d (diff) |
mpsc: add `Sender::send_timeout` (#2227)
Diffstat (limited to 'tokio/src/sync/mpsc/bounded.rs')
-rw-r--r-- | tokio/src/sync/mpsc/bounded.rs | 79 |
1 files changed, 79 insertions, 0 deletions
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index b95611d8..ba89fdd7 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -2,6 +2,11 @@ use crate::sync::mpsc::chan; use crate::sync::mpsc::error::{ClosedError, SendError, TryRecvError, TrySendError}; use crate::sync::semaphore_ll as semaphore; +cfg_time! { + use crate::sync::mpsc::error::SendTimeoutError; + use crate::time::Duration; +} + use std::fmt; use std::task::{Context, Poll}; @@ -322,3 +327,77 @@ impl<T> Sender<T> { } } } + +cfg_time! { + impl<T> Sender<T> { + /// Sends a value, waiting until there is capacity, but only for a limited time. + /// + /// Shares the same success and error conditions as [`send`], adding one more + /// condition for an unsuccessful send, which is when the provided timeout has + /// elapsed, and there is no capacity available. + /// + /// [`send`]: Sender::send + /// + /// # Errors + /// + /// If the receive half of the channel is closed, either due to [`close`] being + /// called or the [`Receiver`] handle dropping, or if the timeout specified + /// elapses before the capacity is available the function returns an error. + /// The error includes the value passed to `send_timeout`. + /// + /// [`close`]: Receiver::close + /// [`Receiver`]: Receiver + /// + /// # Examples + /// + /// In the following example, each call to `send_timeout` will block until the + /// previously sent value was received, unless the timeout has elapsed. + /// + /// ```rust + /// use tokio::sync::mpsc; + /// use tokio::time::{delay_for, Duration}; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx, mut rx) = mpsc::channel(1); + /// + /// tokio::spawn(async move { + /// for i in 0..10 { + /// if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await { + /// println!("send error: #{:?}", e); + /// return; + /// } + /// } + /// }); + /// + /// while let Some(i) = rx.recv().await { + /// println!("got = {}", i); + /// delay_for(Duration::from_millis(200)).await; + /// } + /// } + /// ``` + pub async fn send_timeout( + &mut self, + value: T, + timeout: Duration, + ) -> Result<(), SendTimeoutError<T>> { + use crate::future::poll_fn; + + match crate::time::timeout(timeout, poll_fn(|cx| self.poll_ready(cx))).await { + Err(_) => { + return Err(SendTimeoutError::Timeout(value)); + } + Ok(Err(_)) => { + return Err(SendTimeoutError::Closed(value)); + } + Ok(_) => {} + } + + match self.try_send(value) { + Ok(()) => Ok(()), + Err(TrySendError::Full(_)) => unreachable!(), + Err(TrySendError::Closed(value)) => Err(SendTimeoutError::Closed(value)), + } + } + } +} |