summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/mpsc/bounded.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/sync/mpsc/bounded.rs')
-rw-r--r--tokio/src/sync/mpsc/bounded.rs144
1 files changed, 19 insertions, 125 deletions
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs
index 787dd507..523dde75 100644
--- a/tokio/src/sync/mpsc/bounded.rs
+++ b/tokio/src/sync/mpsc/bounded.rs
@@ -1,8 +1,8 @@
use crate::sync::mpsc::chan;
+use crate::sync::mpsc::error::{ClosedError, SendError, TrySendError};
use crate::sync::semaphore;
use std::fmt;
-use std::pin::Pin;
use std::task::{Context, Poll};
/// Send values to the associated `Receiver`.
@@ -44,27 +44,6 @@ impl<T> fmt::Debug for Receiver<T> {
}
}
-/// Error returned by the `Sender`.
-#[derive(Debug)]
-pub struct SendError(());
-
-/// Error returned by `Sender::try_send`.
-#[derive(Debug)]
-pub struct TrySendError<T> {
- kind: ErrorKind,
- value: T,
-}
-
-#[derive(Debug)]
-enum ErrorKind {
- Closed,
- NoCapacity,
-}
-
-/// Error returned by `Receiver`.
-#[derive(Debug)]
-pub struct RecvError(());
-
/// Create a bounded mpsc channel for communicating between asynchronous tasks,
/// returning the sender/receiver halves.
///
@@ -161,12 +140,12 @@ impl<T> Receiver<T> {
/// }
/// ```
pub async fn recv(&mut self) -> Option<T> {
- use futures_util::future::poll_fn;
+ use crate::future::poll_fn;
poll_fn(|cx| self.poll_recv(cx)).await
}
- #[doc(hidden)] // TODO: remove
+ #[doc(hidden)] // TODO: document
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}
@@ -180,11 +159,14 @@ impl<T> Receiver<T> {
}
}
+impl<T> Unpin for Receiver<T> {}
+
+#[cfg(feature = "stream")]
impl<T> futures_core::Stream for Receiver<T> {
type Item = T;
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
- self.get_mut().poll_recv(cx)
+ fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.poll_recv(cx)
}
}
@@ -193,9 +175,9 @@ impl<T> Sender<T> {
Sender { chan }
}
- #[doc(hidden)] // TODO: remove
- pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
- self.chan.poll_ready(cx).map_err(|_| SendError(()))
+ #[doc(hidden)] // TODO: document
+ pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> {
+ self.chan.poll_ready(cx).map_err(|_| ClosedError::new())
}
/// Attempts to send a message on this `Sender`, returning the message
@@ -233,105 +215,17 @@ impl<T> Sender<T> {
/// }
/// }
/// ```
- pub async fn send(&mut self, value: T) -> Result<(), SendError> {
- use futures_util::future::poll_fn;
-
- poll_fn(|cx| self.poll_ready(cx)).await?;
-
- self.try_send(value).map_err(|_| SendError(()))
- }
-}
-
-impl<T> futures_sink::Sink<T> for Sender<T> {
- type Error = SendError;
-
- fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Sender::poll_ready(self.get_mut(), cx)
- }
-
- fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
- self.as_mut().try_send(msg).map_err(|err| {
- assert!(err.is_full(), "call `poll_ready` before sending");
- SendError(())
- })
- }
-
- fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-
- fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-}
-
-// ===== impl SendError =====
-
-impl fmt::Display for SendError {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(fmt, "channel closed")
- }
-}
-
-impl ::std::error::Error for SendError {}
-
-// ===== impl TrySendError =====
-
-impl<T> TrySendError<T> {
- /// Get the inner value.
- pub fn into_inner(self) -> T {
- self.value
- }
+ pub async fn send(&mut self, value: T) -> Result<(), SendError<T>> {
+ use crate::future::poll_fn;
- /// Did the send fail because the channel has been closed?
- pub fn is_closed(&self) -> bool {
- if let ErrorKind::Closed = self.kind {
- true
- } else {
- false
+ if poll_fn(|cx| self.poll_ready(cx)).await.is_err() {
+ return Err(SendError(value));
}
- }
- /// Did the send fail because the channel was at capacity?
- pub fn is_full(&self) -> bool {
- if let ErrorKind::NoCapacity = self.kind {
- true
- } else {
- false
+ match self.try_send(value) {
+ Ok(()) => Ok(()),
+ Err(TrySendError::Full(_)) => unreachable!(),
+ Err(TrySendError::Closed(value)) => Err(SendError(value)),
}
}
}
-
-impl<T: fmt::Debug> fmt::Display for TrySendError<T> {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- let descr = match self.kind {
- ErrorKind::Closed => "channel closed",
- ErrorKind::NoCapacity => "no available capacity",
- };
- write!(fmt, "{}", descr)
- }
-}
-
-impl<T: fmt::Debug> ::std::error::Error for TrySendError<T> {}
-
-impl<T> From<(T, chan::TrySendError)> for TrySendError<T> {
- fn from((value, err): (T, chan::TrySendError)) -> TrySendError<T> {
- TrySendError {
- value,
- kind: match err {
- chan::TrySendError::Closed => ErrorKind::Closed,
- chan::TrySendError::NoPermits => ErrorKind::NoCapacity,
- },
- }
- }
-}
-
-// ===== impl RecvError =====
-
-impl fmt::Display for RecvError {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(fmt, "channel closed")
- }
-}
-
-impl ::std::error::Error for RecvError {}