diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-15 22:11:13 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-15 22:11:13 -0800 |
commit | 8a7e57786a5dca139f5b4261685e22991ded0859 (patch) | |
tree | b69d1c48f8a760a58fc7ccfe0376d9812a88d303 /tokio/src/sync/mpsc/unbounded.rs | |
parent | 930679587ae42e4df3113159ccf33fb5923dd73a (diff) |
Limit `futures` dependency to `Stream` via feature flag (#1774)
In an effort to reach API stability, the `tokio` crate is shedding its
_public_ dependencies on crates that are either a) do not provide a
stable (1.0+) release with longevity guarantees or b) match the `tokio`
release cadence. Of course, implementing `std` traits fits the
requirements.
The on exception, for now, is the `Stream` trait found in `futures_core`.
It is expected that this trait will not change much and be moved into `std.
Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain
a dependency on this trait given how foundational it is.
Since the `Stream` implementation is optional, types that are logically
streams provide `async fn next_*` functions to obtain the next value.
Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`.
Additionally, some misc cleanup is also done:
- `tokio::io::io` -> `tokio::io::util`.
- `delay` -> `delay_until`.
- `Timeout::new` -> `timeout(...)`.
- `signal::ctrl_c()` returns a future instead of a stream.
- `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait).
- `time::Throttle` is removed (due to lack of `Stream` trait).
- Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns).
Diffstat (limited to 'tokio/src/sync/mpsc/unbounded.rs')
-rw-r--r-- | tokio/src/sync/mpsc/unbounded.rs | 102 |
1 files changed, 13 insertions, 89 deletions
diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index 4eb750ef..07a173c2 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -1,11 +1,10 @@ use crate::loom::sync::atomic::AtomicUsize; use crate::sync::mpsc::chan; +use crate::sync::mpsc::error::SendError; use std::fmt; use std::task::{Context, Poll}; -use std::pin::Pin; - /// Send values to the associated `UnboundedReceiver`. /// /// Instances are created by the @@ -47,18 +46,6 @@ impl<T> fmt::Debug for UnboundedReceiver<T> { } } -/// Error returned by the `UnboundedSender`. -#[derive(Debug)] -pub struct UnboundedSendError(()); - -/// Returned by `UnboundedSender::try_send` when the channel has been closed. -#[derive(Debug)] -pub struct UnboundedTrySendError<T>(T); - -/// Error returned by `UnboundedReceiver`. -#[derive(Debug)] -pub struct UnboundedRecvError(()); - /// Create an unbounded mpsc channel for communicating between asynchronous /// tasks. /// @@ -86,7 +73,7 @@ impl<T> UnboundedReceiver<T> { UnboundedReceiver { chan } } - #[doc(hidden)] // TODO: remove + #[doc(hidden)] // TODO: doc pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { self.chan.recv(cx) } @@ -103,10 +90,10 @@ impl<T> UnboundedReceiver<T> { /// /// #[tokio::main] /// async fn main() { - /// let (mut tx, mut rx) = mpsc::unbounded_channel(); + /// let (tx, mut rx) = mpsc::unbounded_channel(); /// /// tokio::spawn(async move { - /// tx.try_send("hello").unwrap(); + /// tx.send("hello").unwrap(); /// }); /// /// assert_eq!(Some("hello"), rx.recv().await); @@ -121,17 +108,17 @@ impl<T> UnboundedReceiver<T> { /// /// #[tokio::main] /// async fn main() { - /// let (mut tx, mut rx) = mpsc::unbounded_channel(); + /// let (tx, mut rx) = mpsc::unbounded_channel(); /// - /// tx.try_send("hello").unwrap(); - /// tx.try_send("world").unwrap(); + /// tx.send("hello").unwrap(); + /// tx.send("world").unwrap(); /// /// assert_eq!(Some("hello"), rx.recv().await); /// assert_eq!(Some("world"), rx.recv().await); /// } /// ``` pub async fn recv(&mut self) -> Option<T> { - use futures_util::future::poll_fn; + use crate::future::poll_fn; poll_fn(|cx| self.poll_recv(cx)).await } @@ -145,11 +132,12 @@ impl<T> UnboundedReceiver<T> { } } +#[cfg(feature = "stream")] impl<T> futures_core::Stream for UnboundedReceiver<T> { type Item = T; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { - self.chan.recv(cx) + fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + self.poll_recv(cx) } } @@ -159,72 +147,8 @@ impl<T> UnboundedSender<T> { } /// Attempts to send a message on this `UnboundedSender` without blocking. - pub fn try_send(&mut self, message: T) -> Result<(), UnboundedTrySendError<T>> { - self.chan.try_send(message)?; + pub fn send(&self, message: T) -> Result<(), SendError<T>> { + self.chan.send_unbounded(message)?; Ok(()) } } - -impl<T> futures_sink::Sink<T> for UnboundedSender<T> { - type Error = UnboundedSendError; - - fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) - } - - fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { - self.try_send(msg).map_err(|_| UnboundedSendError(())) - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) - } -} - -// ===== impl UnboundedSendError ===== - -impl fmt::Display for UnboundedSendError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "channel closed") - } -} - -impl ::std::error::Error for UnboundedSendError {} - -// ===== impl TrySendError ===== - -impl<T> UnboundedTrySendError<T> { - /// Get the inner value. - pub fn into_inner(self) -> T { - self.0 - } -} - -impl<T: fmt::Debug> fmt::Display for UnboundedTrySendError<T> { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "channel closed") - } -} - -impl<T: fmt::Debug> ::std::error::Error for UnboundedTrySendError<T> {} - -impl<T> From<(T, chan::TrySendError)> for UnboundedTrySendError<T> { - fn from((value, err): (T, chan::TrySendError)) -> UnboundedTrySendError<T> { - assert_eq!(chan::TrySendError::Closed, err); - UnboundedTrySendError(value) - } -} - -// ===== impl UnboundedRecvError ===== - -impl fmt::Display for UnboundedRecvError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "channel closed") - } -} - -impl ::std::error::Error for UnboundedRecvError {} |