diff options
Diffstat (limited to 'tokio/src/sync/mpsc/bounded.rs')
-rw-r--r-- | tokio/src/sync/mpsc/bounded.rs | 144 |
1 files changed, 19 insertions, 125 deletions
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 787dd507..523dde75 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1,8 +1,8 @@ use crate::sync::mpsc::chan; +use crate::sync::mpsc::error::{ClosedError, SendError, TrySendError}; use crate::sync::semaphore; use std::fmt; -use std::pin::Pin; use std::task::{Context, Poll}; /// Send values to the associated `Receiver`. @@ -44,27 +44,6 @@ impl<T> fmt::Debug for Receiver<T> { } } -/// Error returned by the `Sender`. -#[derive(Debug)] -pub struct SendError(()); - -/// Error returned by `Sender::try_send`. -#[derive(Debug)] -pub struct TrySendError<T> { - kind: ErrorKind, - value: T, -} - -#[derive(Debug)] -enum ErrorKind { - Closed, - NoCapacity, -} - -/// Error returned by `Receiver`. -#[derive(Debug)] -pub struct RecvError(()); - /// Create a bounded mpsc channel for communicating between asynchronous tasks, /// returning the sender/receiver halves. /// @@ -161,12 +140,12 @@ impl<T> Receiver<T> { /// } /// ``` pub async fn recv(&mut self) -> Option<T> { - use futures_util::future::poll_fn; + use crate::future::poll_fn; poll_fn(|cx| self.poll_recv(cx)).await } - #[doc(hidden)] // TODO: remove + #[doc(hidden)] // TODO: document pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { self.chan.recv(cx) } @@ -180,11 +159,14 @@ impl<T> Receiver<T> { } } +impl<T> Unpin for Receiver<T> {} + +#[cfg(feature = "stream")] impl<T> futures_core::Stream for Receiver<T> { type Item = T; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { - self.get_mut().poll_recv(cx) + fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + self.poll_recv(cx) } } @@ -193,9 +175,9 @@ impl<T> Sender<T> { Sender { chan } } - #[doc(hidden)] // TODO: remove - pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { - self.chan.poll_ready(cx).map_err(|_| SendError(())) + #[doc(hidden)] // TODO: document + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> { + self.chan.poll_ready(cx).map_err(|_| ClosedError::new()) } /// Attempts to send a message on this `Sender`, returning the message @@ -233,105 +215,17 @@ impl<T> Sender<T> { /// } /// } /// ``` - pub async fn send(&mut self, value: T) -> Result<(), SendError> { - use futures_util::future::poll_fn; - - poll_fn(|cx| self.poll_ready(cx)).await?; - - self.try_send(value).map_err(|_| SendError(())) - } -} - -impl<T> futures_sink::Sink<T> for Sender<T> { - type Error = SendError; - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Sender::poll_ready(self.get_mut(), cx) - } - - fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { - self.as_mut().try_send(msg).map_err(|err| { - assert!(err.is_full(), "call `poll_ready` before sending"); - SendError(()) - }) - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) - } -} - -// ===== impl SendError ===== - -impl fmt::Display for SendError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "channel closed") - } -} - -impl ::std::error::Error for SendError {} - -// ===== impl TrySendError ===== - -impl<T> TrySendError<T> { - /// Get the inner value. - pub fn into_inner(self) -> T { - self.value - } + pub async fn send(&mut self, value: T) -> Result<(), SendError<T>> { + use crate::future::poll_fn; - /// Did the send fail because the channel has been closed? - pub fn is_closed(&self) -> bool { - if let ErrorKind::Closed = self.kind { - true - } else { - false + if poll_fn(|cx| self.poll_ready(cx)).await.is_err() { + return Err(SendError(value)); } - } - /// Did the send fail because the channel was at capacity? - pub fn is_full(&self) -> bool { - if let ErrorKind::NoCapacity = self.kind { - true - } else { - false + match self.try_send(value) { + Ok(()) => Ok(()), + Err(TrySendError::Full(_)) => unreachable!(), + Err(TrySendError::Closed(value)) => Err(SendError(value)), } } } - -impl<T: fmt::Debug> fmt::Display for TrySendError<T> { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - let descr = match self.kind { - ErrorKind::Closed => "channel closed", - ErrorKind::NoCapacity => "no available capacity", - }; - write!(fmt, "{}", descr) - } -} - -impl<T: fmt::Debug> ::std::error::Error for TrySendError<T> {} - -impl<T> From<(T, chan::TrySendError)> for TrySendError<T> { - fn from((value, err): (T, chan::TrySendError)) -> TrySendError<T> { - TrySendError { - value, - kind: match err { - chan::TrySendError::Closed => ErrorKind::Closed, - chan::TrySendError::NoPermits => ErrorKind::NoCapacity, - }, - } - } -} - -// ===== impl RecvError ===== - -impl fmt::Display for RecvError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "channel closed") - } -} - -impl ::std::error::Error for RecvError {} |