summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/mpsc/unbounded.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-15 22:11:13 -0800
committerGitHub <noreply@github.com>2019-11-15 22:11:13 -0800
commit8a7e57786a5dca139f5b4261685e22991ded0859 (patch)
treeb69d1c48f8a760a58fc7ccfe0376d9812a88d303 /tokio/src/sync/mpsc/unbounded.rs
parent930679587ae42e4df3113159ccf33fb5923dd73a (diff)
Limit `futures` dependency to `Stream` via feature flag (#1774)
In an effort to reach API stability, the `tokio` crate is shedding its _public_ dependencies on crates that are either a) do not provide a stable (1.0+) release with longevity guarantees or b) match the `tokio` release cadence. Of course, implementing `std` traits fits the requirements. The on exception, for now, is the `Stream` trait found in `futures_core`. It is expected that this trait will not change much and be moved into `std. Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain a dependency on this trait given how foundational it is. Since the `Stream` implementation is optional, types that are logically streams provide `async fn next_*` functions to obtain the next value. Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`. Additionally, some misc cleanup is also done: - `tokio::io::io` -> `tokio::io::util`. - `delay` -> `delay_until`. - `Timeout::new` -> `timeout(...)`. - `signal::ctrl_c()` returns a future instead of a stream. - `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait). - `time::Throttle` is removed (due to lack of `Stream` trait). - Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns).
Diffstat (limited to 'tokio/src/sync/mpsc/unbounded.rs')
-rw-r--r--tokio/src/sync/mpsc/unbounded.rs102
1 files changed, 13 insertions, 89 deletions
diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs
index 4eb750ef..07a173c2 100644
--- a/tokio/src/sync/mpsc/unbounded.rs
+++ b/tokio/src/sync/mpsc/unbounded.rs
@@ -1,11 +1,10 @@
use crate::loom::sync::atomic::AtomicUsize;
use crate::sync::mpsc::chan;
+use crate::sync::mpsc::error::SendError;
use std::fmt;
use std::task::{Context, Poll};
-use std::pin::Pin;
-
/// Send values to the associated `UnboundedReceiver`.
///
/// Instances are created by the
@@ -47,18 +46,6 @@ impl<T> fmt::Debug for UnboundedReceiver<T> {
}
}
-/// Error returned by the `UnboundedSender`.
-#[derive(Debug)]
-pub struct UnboundedSendError(());
-
-/// Returned by `UnboundedSender::try_send` when the channel has been closed.
-#[derive(Debug)]
-pub struct UnboundedTrySendError<T>(T);
-
-/// Error returned by `UnboundedReceiver`.
-#[derive(Debug)]
-pub struct UnboundedRecvError(());
-
/// Create an unbounded mpsc channel for communicating between asynchronous
/// tasks.
///
@@ -86,7 +73,7 @@ impl<T> UnboundedReceiver<T> {
UnboundedReceiver { chan }
}
- #[doc(hidden)] // TODO: remove
+ #[doc(hidden)] // TODO: doc
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}
@@ -103,10 +90,10 @@ impl<T> UnboundedReceiver<T> {
///
/// #[tokio::main]
/// async fn main() {
- /// let (mut tx, mut rx) = mpsc::unbounded_channel();
+ /// let (tx, mut rx) = mpsc::unbounded_channel();
///
/// tokio::spawn(async move {
- /// tx.try_send("hello").unwrap();
+ /// tx.send("hello").unwrap();
/// });
///
/// assert_eq!(Some("hello"), rx.recv().await);
@@ -121,17 +108,17 @@ impl<T> UnboundedReceiver<T> {
///
/// #[tokio::main]
/// async fn main() {
- /// let (mut tx, mut rx) = mpsc::unbounded_channel();
+ /// let (tx, mut rx) = mpsc::unbounded_channel();
///
- /// tx.try_send("hello").unwrap();
- /// tx.try_send("world").unwrap();
+ /// tx.send("hello").unwrap();
+ /// tx.send("world").unwrap();
///
/// assert_eq!(Some("hello"), rx.recv().await);
/// assert_eq!(Some("world"), rx.recv().await);
/// }
/// ```
pub async fn recv(&mut self) -> Option<T> {
- use futures_util::future::poll_fn;
+ use crate::future::poll_fn;
poll_fn(|cx| self.poll_recv(cx)).await
}
@@ -145,11 +132,12 @@ impl<T> UnboundedReceiver<T> {
}
}
+#[cfg(feature = "stream")]
impl<T> futures_core::Stream for UnboundedReceiver<T> {
type Item = T;
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
- self.chan.recv(cx)
+ fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.poll_recv(cx)
}
}
@@ -159,72 +147,8 @@ impl<T> UnboundedSender<T> {
}
/// Attempts to send a message on this `UnboundedSender` without blocking.
- pub fn try_send(&mut self, message: T) -> Result<(), UnboundedTrySendError<T>> {
- self.chan.try_send(message)?;
+ pub fn send(&self, message: T) -> Result<(), SendError<T>> {
+ self.chan.send_unbounded(message)?;
Ok(())
}
}
-
-impl<T> futures_sink::Sink<T> for UnboundedSender<T> {
- type Error = UnboundedSendError;
-
- fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-
- fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
- self.try_send(msg).map_err(|_| UnboundedSendError(()))
- }
-
- fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-
- fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-}
-
-// ===== impl UnboundedSendError =====
-
-impl fmt::Display for UnboundedSendError {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(fmt, "channel closed")
- }
-}
-
-impl ::std::error::Error for UnboundedSendError {}
-
-// ===== impl TrySendError =====
-
-impl<T> UnboundedTrySendError<T> {
- /// Get the inner value.
- pub fn into_inner(self) -> T {
- self.0
- }
-}
-
-impl<T: fmt::Debug> fmt::Display for UnboundedTrySendError<T> {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(fmt, "channel closed")
- }
-}
-
-impl<T: fmt::Debug> ::std::error::Error for UnboundedTrySendError<T> {}
-
-impl<T> From<(T, chan::TrySendError)> for UnboundedTrySendError<T> {
- fn from((value, err): (T, chan::TrySendError)) -> UnboundedTrySendError<T> {
- assert_eq!(chan::TrySendError::Closed, err);
- UnboundedTrySendError(value)
- }
-}
-
-// ===== impl UnboundedRecvError =====
-
-impl fmt::Display for UnboundedRecvError {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(fmt, "channel closed")
- }
-}
-
-impl ::std::error::Error for UnboundedRecvError {}