summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/mpsc
diff options
context:
space:
mode:
authornasa <htilcs1115@gmail.com>2020-04-07 05:49:10 +0900
committerGitHub <noreply@github.com>2020-04-06 22:49:10 +0200
commitde8326a5a490aafdf12848820d1496b758f1ec57 (patch)
treea72ad2ca481c6678671fdfa14a2ee79e34c14b1b /tokio/src/sync/mpsc
parentd65bf3805be376505f5c2fb0724cec8917dfb813 (diff)
doc: Sort methods on mpsc::Sender in doc (#2379)
Diffstat (limited to 'tokio/src/sync/mpsc')
-rw-r--r--tokio/src/sync/mpsc/bounded.rs307
1 files changed, 152 insertions, 155 deletions
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs
index a0b6db38..afca8c52 100644
--- a/tokio/src/sync/mpsc/bounded.rs
+++ b/tokio/src/sync/mpsc/bounded.rs
@@ -196,87 +196,62 @@ impl<T> Sender<T> {
Sender { chan }
}
- /// Returns `Poll::Ready(Ok(()))` when the channel is able to accept another item.
- ///
- /// If the channel is full, then `Poll::Pending` is returned and the task is notified when a
- /// slot becomes available.
- ///
- /// Once `poll_ready` returns `Poll::Ready(Ok(()))`, a call to `try_send` will succeed unless
- /// the channel has since been closed. To provide this guarantee, the channel reserves one slot
- /// in the channel for the coming send. This reserved slot is not available to other `Sender`
- /// instances, so you need to be careful to not end up with deadlocks by blocking after calling
- /// `poll_ready` but before sending an element.
+ /// Sends a value, waiting until there is capacity.
///
- /// If, after `poll_ready` succeeds, you decide you do not wish to send an item after all, you
- /// can use [`disarm`](Sender::disarm) to release the reserved slot.
+ /// A successful send occurs when it is determined that the other end of the
+ /// channel has not hung up already. An unsuccessful send would be one where
+ /// the corresponding receiver has already been closed. Note that a return
+ /// value of `Err` means that the data will never be received, but a return
+ /// value of `Ok` does not mean that the data will be received. It is
+ /// possible for the corresponding receiver to hang up immediately after
+ /// this function returns `Ok`.
///
- /// Until an item is sent or [`disarm`](Sender::disarm) is called, repeated calls to
- /// `poll_ready` will return either `Poll::Ready(Ok(()))` or `Poll::Ready(Err(_))` if channel
- /// is closed.
- pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> {
- self.chan.poll_ready(cx).map_err(|_| ClosedError::new())
- }
-
- /// Undo a successful call to `poll_ready`.
+ /// # Errors
///
- /// Once a call to `poll_ready` returns `Poll::Ready(Ok(()))`, it holds up one slot in the
- /// channel to make room for the coming send. `disarm` allows you to give up that slot if you
- /// decide you do not wish to send an item after all. After calling `disarm`, you must call
- /// `poll_ready` until it returns `Poll::Ready(Ok(()))` before attempting to send again.
+ /// If the receive half of the channel is closed, either due to [`close`]
+ /// being called or the [`Receiver`] handle dropping, the function returns
+ /// an error. The error includes the value passed to `send`.
///
- /// Returns `false` if no slot is reserved for this sender (usually because `poll_ready` was
- /// not previously called, or did not succeed).
+ /// [`close`]: Receiver::close
+ /// [`Receiver`]: Receiver
///
- /// # Motivation
+ /// # Examples
///
- /// Since `poll_ready` takes up one of the finite number of slots in a bounded channel, callers
- /// need to send an item shortly after `poll_ready` succeeds. If they do not, idle senders may
- /// take up all the slots of the channel, and prevent active senders from getting any requests
- /// through. Consider this code that forwards from one channel to another:
+ /// In the following example, each call to `send` will block until the
+ /// previously sent value was received.
///
- /// ```rust,ignore
- /// loop {
- /// ready!(tx.poll_ready(cx))?;
- /// if let Some(item) = ready!(rx.poll_recv(cx)) {
- /// tx.try_send(item)?;
- /// } else {
- /// break;
- /// }
- /// }
- /// ```
+ /// ```rust
+ /// use tokio::sync::mpsc;
///
- /// If many such forwarders exist, and they all forward into a single (cloned) `Sender`, then
- /// any number of forwarders may be waiting for `rx.poll_recv` at the same time. While they do,
- /// they are effectively each reducing the channel's capacity by 1. If enough of these
- /// forwarders are idle, forwarders whose `rx` _do_ have elements will be unable to find a spot
- /// for them through `poll_ready`, and the system will deadlock.
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx, mut rx) = mpsc::channel(1);
///
- /// `disarm` solves this problem by allowing you to give up the reserved slot if you find that
- /// you have to block. We can then fix the code above by writing:
+ /// tokio::spawn(async move {
+ /// for i in 0..10 {
+ /// if let Err(_) = tx.send(i).await {
+ /// println!("receiver dropped");
+ /// return;
+ /// }
+ /// }
+ /// });
///
- /// ```rust,ignore
- /// loop {
- /// ready!(tx.poll_ready(cx))?;
- /// let item = rx.poll_recv(cx);
- /// if let Poll::Ready(Ok(_)) = item {
- /// // we're going to send the item below, so don't disarm
- /// } else {
- /// // give up our send slot, we won't need it for a while
- /// tx.disarm();
- /// }
- /// if let Some(item) = ready!(item) {
- /// tx.try_send(item)?;
- /// } else {
- /// break;
- /// }
+ /// while let Some(i) = rx.recv().await {
+ /// println!("got = {}", i);
+ /// }
/// }
/// ```
- pub fn disarm(&mut self) -> bool {
- if self.chan.is_ready() {
- self.chan.disarm();
- true
- } else {
- false
+ pub async fn send(&mut self, value: T) -> Result<(), SendError<T>> {
+ use crate::future::poll_fn;
+
+ if poll_fn(|cx| self.poll_ready(cx)).await.is_err() {
+ return Err(SendError(value));
+ }
+
+ match self.try_send(value) {
+ Ok(()) => Ok(()),
+ Err(TrySendError::Full(_)) => unreachable!(),
+ Err(TrySendError::Closed(value)) => Err(SendError(value)),
}
}
@@ -347,15 +322,13 @@ impl<T> Sender<T> {
Ok(())
}
- /// Sends a value, waiting until there is capacity.
+ /// Sends a value, waiting until there is capacity, but only for a limited time.
///
- /// A successful send occurs when it is determined that the other end of the
- /// channel has not hung up already. An unsuccessful send would be one where
- /// the corresponding receiver has already been closed. Note that a return
- /// value of `Err` means that the data will never be received, but a return
- /// value of `Ok` does not mean that the data will be received. It is
- /// possible for the corresponding receiver to hang up immediately after
- /// this function returns `Ok`.
+ /// Shares the same success and error conditions as [`send`], adding one more
+ /// condition for an unsuccessful send, which is when the provided timeout has
+ /// elapsed, and there is no capacity available.
+ ///
+ /// [`send`]: Sender::send
///
/// # Errors
///
@@ -368,11 +341,12 @@ impl<T> Sender<T> {
///
/// # Examples
///
- /// In the following example, each call to `send` will block until the
- /// previously sent value was received.
+ /// In the following example, each call to `send_timeout` will block until the
+ /// previously sent value was received, unless the timeout has elapsed.
///
/// ```rust
/// use tokio::sync::mpsc;
+ /// use tokio::time::{delay_for, Duration};
///
/// #[tokio::main]
/// async fn main() {
@@ -380,8 +354,8 @@ impl<T> Sender<T> {
///
/// tokio::spawn(async move {
/// for i in 0..10 {
- /// if let Err(_) = tx.send(i).await {
- /// println!("receiver dropped");
+ /// if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
+ /// println!("send error: #{:?}", e);
/// return;
/// }
/// }
@@ -389,94 +363,117 @@ impl<T> Sender<T> {
///
/// while let Some(i) = rx.recv().await {
/// println!("got = {}", i);
+ /// delay_for(Duration::from_millis(200)).await;
/// }
/// }
/// ```
- pub async fn send(&mut self, value: T) -> Result<(), SendError<T>> {
+ #[cfg(feature = "time")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
+ pub async fn send_timeout(
+ &mut self,
+ value: T,
+ timeout: Duration,
+ ) -> Result<(), SendTimeoutError<T>> {
use crate::future::poll_fn;
- if poll_fn(|cx| self.poll_ready(cx)).await.is_err() {
- return Err(SendError(value));
+ match crate::time::timeout(timeout, poll_fn(|cx| self.poll_ready(cx))).await {
+ Err(_) => {
+ return Err(SendTimeoutError::Timeout(value));
+ }
+ Ok(Err(_)) => {
+ return Err(SendTimeoutError::Closed(value));
+ }
+ Ok(_) => {}
}
match self.try_send(value) {
Ok(()) => Ok(()),
Err(TrySendError::Full(_)) => unreachable!(),
- Err(TrySendError::Closed(value)) => Err(SendError(value)),
+ Err(TrySendError::Closed(value)) => Err(SendTimeoutError::Closed(value)),
}
}
-}
-cfg_time! {
- impl<T> Sender<T> {
- /// Sends a value, waiting until there is capacity, but only for a limited time.
- ///
- /// Shares the same success and error conditions as [`send`], adding one more
- /// condition for an unsuccessful send, which is when the provided timeout has
- /// elapsed, and there is no capacity available.
- ///
- /// [`send`]: Sender::send
- ///
- /// # Errors
- ///
- /// If the receive half of the channel is closed, either due to [`close`] being
- /// called or the [`Receiver`] handle dropping, or if the timeout specified
- /// elapses before the capacity is available the function returns an error.
- /// The error includes the value passed to `send_timeout`.
- ///
- /// [`close`]: Receiver::close
- /// [`Receiver`]: Receiver
- ///
- /// # Examples
- ///
- /// In the following example, each call to `send_timeout` will block until the
- /// previously sent value was received, unless the timeout has elapsed.
- ///
- /// ```rust
- /// use tokio::sync::mpsc;
- /// use tokio::time::{delay_for, Duration};
- ///
- /// #[tokio::main]
- /// async fn main() {
- /// let (mut tx, mut rx) = mpsc::channel(1);
- ///
- /// tokio::spawn(async move {
- /// for i in 0..10 {
- /// if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
- /// println!("send error: #{:?}", e);
- /// return;
- /// }
- /// }
- /// });
- ///
- /// while let Some(i) = rx.recv().await {
- /// println!("got = {}", i);
- /// delay_for(Duration::from_millis(200)).await;
- /// }
- /// }
- /// ```
- pub async fn send_timeout(
- &mut self,
- value: T,
- timeout: Duration,
- ) -> Result<(), SendTimeoutError<T>> {
- use crate::future::poll_fn;
-
- match crate::time::timeout(timeout, poll_fn(|cx| self.poll_ready(cx))).await {
- Err(_) => {
- return Err(SendTimeoutError::Timeout(value));
- }
- Ok(Err(_)) => {
- return Err(SendTimeoutError::Closed(value));
- }
- Ok(_) => {}
- }
+ /// Returns `Poll::Ready(Ok(()))` when the channel is able to accept another item.
+ ///
+ /// If the channel is full, then `Poll::Pending` is returned and the task is notified when a
+ /// slot becomes available.
+ ///
+ /// Once `poll_ready` returns `Poll::Ready(Ok(()))`, a call to `try_send` will succeed unless
+ /// the channel has since been closed. To provide this guarantee, the channel reserves one slot
+ /// in the channel for the coming send. This reserved slot is not available to other `Sender`
+ /// instances, so you need to be careful to not end up with deadlocks by blocking after calling
+ /// `poll_ready` but before sending an element.
+ ///
+ /// If, after `poll_ready` succeeds, you decide you do not wish to send an item after all, you
+ /// can use [`disarm`](Sender::disarm) to release the reserved slot.
+ ///
+ /// Until an item is sent or [`disarm`](Sender::disarm) is called, repeated calls to
+ /// `poll_ready` will return either `Poll::Ready(Ok(()))` or `Poll::Ready(Err(_))` if channel
+ /// is closed.
+ pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> {
+ self.chan.poll_ready(cx).map_err(|_| ClosedError::new())
+ }
- match self.try_send(value) {
- Ok(()) => Ok(()),
- Err(TrySendError::Full(_)) => unreachable!(),
- Err(TrySendError::Closed(value)) => Err(SendTimeoutError::Closed(value)),
- }
+ /// Undo a successful call to `poll_ready`.
+ ///
+ /// Once a call to `poll_ready` returns `Poll::Ready(Ok(()))`, it holds up one slot in the
+ /// channel to make room for the coming send. `disarm` allows you to give up that slot if you
+ /// decide you do not wish to send an item after all. After calling `disarm`, you must call
+ /// `poll_ready` until it returns `Poll::Ready(Ok(()))` before attempting to send again.
+ ///
+ /// Returns `false` if no slot is reserved for this sender (usually because `poll_ready` was
+ /// not previously called, or did not succeed).
+ ///
+ /// # Motivation
+ ///
+ /// Since `poll_ready` takes up one of the finite number of slots in a bounded channel, callers
+ /// need to send an item shortly after `poll_ready` succeeds. If they do not, idle senders may
+ /// take up all the slots of the channel, and prevent active senders from getting any requests
+ /// through. Consider this code that forwards from one channel to another:
+ ///
+ /// ```rust,ignore
+ /// loop {
+ /// ready!(tx.poll_ready(cx))?;
+ /// if let Some(item) = ready!(rx.poll_recv(cx)) {
+ /// tx.try_send(item)?;
+ /// } else {
+ /// break;
+ /// }
+ /// }
+ /// ```
+ ///
+ /// If many such forwarders exist, and they all forward into a single (cloned) `Sender`, then
+ /// any number of forwarders may be waiting for `rx.poll_recv` at the same time. While they do,
+ /// they are effectively each reducing the channel's capacity by 1. If enough of these
+ /// forwarders are idle, forwarders whose `rx` _do_ have elements will be unable to find a spot
+ /// for them through `poll_ready`, and the system will deadlock.
+ ///
+ /// `disarm` solves this problem by allowing you to give up the reserved slot if you find that
+ /// you have to block. We can then fix the code above by writing:
+ ///
+ /// ```rust,ignore
+ /// loop {
+ /// ready!(tx.poll_ready(cx))?;
+ /// let item = rx.poll_recv(cx);
+ /// if let Poll::Ready(Ok(_)) = item {
+ /// // we're going to send the item below, so don't disarm
+ /// } else {
+ /// // give up our send slot, we won't need it for a while
+ /// tx.disarm();
+ /// }
+ /// if let Some(item) = ready!(item) {
+ /// tx.try_send(item)?;
+ /// } else {
+ /// break;
+ /// }
+ /// }
+ /// ```
+ pub fn disarm(&mut self) -> bool {
+ if self.chan.is_ready() {
+ self.chan.disarm();
+ true
+ } else {
+ false
}
}
}