summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-15 22:11:13 -0800
committerGitHub <noreply@github.com>2019-11-15 22:11:13 -0800
commit8a7e57786a5dca139f5b4261685e22991ded0859 (patch)
treeb69d1c48f8a760a58fc7ccfe0376d9812a88d303 /tokio/src/sync
parent930679587ae42e4df3113159ccf33fb5923dd73a (diff)
Limit `futures` dependency to `Stream` via feature flag (#1774)
In an effort to reach API stability, the `tokio` crate is shedding its _public_ dependencies on crates that are either a) do not provide a stable (1.0+) release with longevity guarantees or b) match the `tokio` release cadence. Of course, implementing `std` traits fits the requirements. The on exception, for now, is the `Stream` trait found in `futures_core`. It is expected that this trait will not change much and be moved into `std. Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain a dependency on this trait given how foundational it is. Since the `Stream` implementation is optional, types that are logically streams provide `async fn next_*` functions to obtain the next value. Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`. Additionally, some misc cleanup is also done: - `tokio::io::io` -> `tokio::io::util`. - `delay` -> `delay_until`. - `Timeout::new` -> `timeout(...)`. - `signal::ctrl_c()` returns a future instead of a stream. - `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait). - `time::Throttle` is removed (due to lack of `Stream` trait). - Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns).
Diffstat (limited to 'tokio/src/sync')
-rw-r--r--tokio/src/sync/barrier.rs3
-rw-r--r--tokio/src/sync/mpsc/bounded.rs144
-rw-r--r--tokio/src/sync/mpsc/chan.rs98
-rw-r--r--tokio/src/sync/mpsc/error.rs86
-rw-r--r--tokio/src/sync/mpsc/mod.rs7
-rw-r--r--tokio/src/sync/mpsc/unbounded.rs102
-rw-r--r--tokio/src/sync/mutex.rs2
-rw-r--r--tokio/src/sync/oneshot.rs3
-rw-r--r--tokio/src/sync/tests/loom_atomic_waker.rs2
-rw-r--r--tokio/src/sync/tests/loom_mpsc.rs2
-rw-r--r--tokio/src/sync/tests/loom_oneshot.rs2
-rw-r--r--tokio/src/sync/tests/loom_semaphore.rs3
-rw-r--r--tokio/src/sync/watch.rs110
13 files changed, 230 insertions, 334 deletions
diff --git a/tokio/src/sync/barrier.rs b/tokio/src/sync/barrier.rs
index 1582120e..911e78fe 100644
--- a/tokio/src/sync/barrier.rs
+++ b/tokio/src/sync/barrier.rs
@@ -8,8 +8,9 @@ use std::sync::Mutex;
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::sync::Barrier;
+///
+/// use futures::future::join_all;
/// use std::sync::Arc;
-/// use futures_util::future::join_all;
///
/// let mut handles = Vec::with_capacity(10);
/// let barrier = Arc::new(Barrier::new(10));
diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs
index 787dd507..523dde75 100644
--- a/tokio/src/sync/mpsc/bounded.rs
+++ b/tokio/src/sync/mpsc/bounded.rs
@@ -1,8 +1,8 @@
use crate::sync::mpsc::chan;
+use crate::sync::mpsc::error::{ClosedError, SendError, TrySendError};
use crate::sync::semaphore;
use std::fmt;
-use std::pin::Pin;
use std::task::{Context, Poll};
/// Send values to the associated `Receiver`.
@@ -44,27 +44,6 @@ impl<T> fmt::Debug for Receiver<T> {
}
}
-/// Error returned by the `Sender`.
-#[derive(Debug)]
-pub struct SendError(());
-
-/// Error returned by `Sender::try_send`.
-#[derive(Debug)]
-pub struct TrySendError<T> {
- kind: ErrorKind,
- value: T,
-}
-
-#[derive(Debug)]
-enum ErrorKind {
- Closed,
- NoCapacity,
-}
-
-/// Error returned by `Receiver`.
-#[derive(Debug)]
-pub struct RecvError(());
-
/// Create a bounded mpsc channel for communicating between asynchronous tasks,
/// returning the sender/receiver halves.
///
@@ -161,12 +140,12 @@ impl<T> Receiver<T> {
/// }
/// ```
pub async fn recv(&mut self) -> Option<T> {
- use futures_util::future::poll_fn;
+ use crate::future::poll_fn;
poll_fn(|cx| self.poll_recv(cx)).await
}
- #[doc(hidden)] // TODO: remove
+ #[doc(hidden)] // TODO: document
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}
@@ -180,11 +159,14 @@ impl<T> Receiver<T> {
}
}
+impl<T> Unpin for Receiver<T> {}
+
+#[cfg(feature = "stream")]
impl<T> futures_core::Stream for Receiver<T> {
type Item = T;
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
- self.get_mut().poll_recv(cx)
+ fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.poll_recv(cx)
}
}
@@ -193,9 +175,9 @@ impl<T> Sender<T> {
Sender { chan }
}
- #[doc(hidden)] // TODO: remove
- pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
- self.chan.poll_ready(cx).map_err(|_| SendError(()))
+ #[doc(hidden)] // TODO: document
+ pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> {
+ self.chan.poll_ready(cx).map_err(|_| ClosedError::new())
}
/// Attempts to send a message on this `Sender`, returning the message
@@ -233,105 +215,17 @@ impl<T> Sender<T> {
/// }
/// }
/// ```
- pub async fn send(&mut self, value: T) -> Result<(), SendError> {
- use futures_util::future::poll_fn;
-
- poll_fn(|cx| self.poll_ready(cx)).await?;
-
- self.try_send(value).map_err(|_| SendError(()))
- }
-}
-
-impl<T> futures_sink::Sink<T> for Sender<T> {
- type Error = SendError;
-
- fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Sender::poll_ready(self.get_mut(), cx)
- }
-
- fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
- self.as_mut().try_send(msg).map_err(|err| {
- assert!(err.is_full(), "call `poll_ready` before sending");
- SendError(())
- })
- }
-
- fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-
- fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-}
-
-// ===== impl SendError =====
-
-impl fmt::Display for SendError {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(fmt, "channel closed")
- }
-}
-
-impl ::std::error::Error for SendError {}
-
-// ===== impl TrySendError =====
-
-impl<T> TrySendError<T> {
- /// Get the inner value.
- pub fn into_inner(self) -> T {
- self.value
- }
+ pub async fn send(&mut self, value: T) -> Result<(), SendError<T>> {
+ use crate::future::poll_fn;
- /// Did the send fail because the channel has been closed?
- pub fn is_closed(&self) -> bool {
- if let ErrorKind::Closed = self.kind {
- true
- } else {
- false
+ if poll_fn(|cx| self.poll_ready(cx)).await.is_err() {
+ return Err(SendError(value));
}
- }
- /// Did the send fail because the channel was at capacity?
- pub fn is_full(&self) -> bool {
- if let ErrorKind::NoCapacity = self.kind {
- true
- } else {
- false
+ match self.try_send(value) {
+ Ok(()) => Ok(()),
+ Err(TrySendError::Full(_)) => unreachable!(),
+ Err(TrySendError::Closed(value)) => Err(SendError(value)),
}
}
}
-
-impl<T: fmt::Debug> fmt::Display for TrySendError<T> {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- let descr = match self.kind {
- ErrorKind::Closed => "channel closed",
- ErrorKind::NoCapacity => "no available capacity",
- };
- write!(fmt, "{}", descr)
- }
-}
-
-impl<T: fmt::Debug> ::std::error::Error for TrySendError<T> {}
-
-impl<T> From<(T, chan::TrySendError)> for TrySendError<T> {
- fn from((value, err): (T, chan::TrySendError)) -> TrySendError<T> {
- TrySendError {
- value,
- kind: match err {
- chan::TrySendError::Closed => ErrorKind::Closed,
- chan::TrySendError::NoPermits => ErrorKind::NoCapacity,
- },
- }
- }
-}
-
-// ===== impl RecvError =====
-
-impl fmt::Display for RecvError {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(fmt, "channel closed")
- }
-}
-
-impl ::std::error::Error for RecvError {}
diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs
index ad0d99a8..03f35339 100644
--- a/tokio/src/sync/mpsc/chan.rs
+++ b/tokio/src/sync/mpsc/chan.rs
@@ -1,5 +1,9 @@
-use crate::loom::{cell::CausalCell, future::AtomicWaker, sync::atomic::AtomicUsize, sync::Arc};
-use crate::sync::mpsc::list;
+use crate::loom::cell::CausalCell;
+use crate::loom::future::AtomicWaker;
+use crate::loom::sync::atomic::AtomicUsize;
+use crate::loom::sync::Arc;
+use crate::sync::mpsc::error::ClosedError;
+use crate::sync::mpsc::{error, list};
use std::fmt;
use std::process;
@@ -43,7 +47,25 @@ where
#[derive(Debug, Eq, PartialEq)]
pub(crate) enum TrySendError {
Closed,
- NoPermits,
+ Full,
+}
+
+impl<T> From<(T, TrySendError)> for error::SendError<T> {
+ fn from(src: (T, TrySendError)) -> error::SendError<T> {
+ match src.1 {
+ TrySendError::Closed => error::SendError(src.0),
+ TrySendError::Full => unreachable!(),
+ }
+ }
+}
+
+impl<T> From<(T, TrySendError)> for error::TrySendError<T> {
+ fn from(src: (T, TrySendError)) -> error::TrySendError<T> {
+ match src.1 {
+ TrySendError::Closed => error::TrySendError::Closed(src.0),
+ TrySendError::Full => error::TrySendError::Full(src.0),
+ }
+ }
}
pub(crate) trait Semaphore {
@@ -59,8 +81,11 @@ pub(crate) trait Semaphore {
fn add_permit(&self);
- fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Self::Permit)
- -> Poll<Result<(), ()>>;
+ fn poll_acquire(
+ &self,
+ cx: &mut Context<'_>,
+ permit: &mut Self::Permit,
+ ) -> Poll<Result<(), ClosedError>>;
fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>;
@@ -161,26 +186,19 @@ where
}
}
- pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
+ pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> {
self.inner.semaphore.poll_acquire(cx, &mut self.permit)
}
/// Send a message and notify the receiver.
pub(crate) fn try_send(&mut self, value: T) -> Result<(), (T, TrySendError)> {
- if let Err(e) = self.inner.semaphore.try_acquire(&mut self.permit) {
- return Err((value, e));
- }
-
- // Push the value
- self.inner.tx.push(value);
-
- // Notify the rx task
- self.inner.rx_waker.wake();
-
- // Release the permit
- self.inner.semaphore.forget(&mut self.permit);
+ self.inner.try_send(value, &mut self.permit)
+ }
+}
- Ok(())
+impl<T> Tx<T, AtomicUsize> {
+ pub(crate) fn send_unbounded(&self, value: T) -> Result<(), (T, TrySendError)> {
+ self.inner.try_send(value, &mut ())
}
}
@@ -317,6 +335,28 @@ where
// ===== impl Chan =====
+impl<T, S> Chan<T, S>
+where
+ S: Semaphore,
+{
+ fn try_send(&self, value: T, permit: &mut S::Permit) -> Result<(), (T, TrySendError)> {
+ if let Err(e) = self.semaphore.try_acquire(permit) {
+ return Err((value, e));
+ }
+
+ // Push the value
+ self.tx.push(value);
+
+ // Notify the rx task
+ self.rx_waker.wake();
+
+ // Release the permit
+ self.semaphore.forget(permit);
+
+ Ok(())
+ }
+}
+
impl<T, S> Drop for Chan<T, S> {
fn drop(&mut self) {
use super::block::Read::Value;
@@ -339,7 +379,7 @@ impl From<TryAcquireError> for TrySendError {
if src.is_closed() {
TrySendError::Closed
} else if src.is_no_permits() {
- TrySendError::NoPermits
+ TrySendError::Full
} else {
unreachable!();
}
@@ -369,8 +409,14 @@ impl Semaphore for (crate::sync::semaphore::Semaphore, usize) {
self.0.available_permits() == self.1
}
- fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Permit) -> Poll<Result<(), ()>> {
- permit.poll_acquire(cx, &self.0).map_err(|_| ())
+ fn poll_acquire(
+ &self,
+ cx: &mut Context<'_>,
+ permit: &mut Permit,
+ ) -> Poll<Result<(), ClosedError>> {
+ permit
+ .poll_acquire(cx, &self.0)
+ .map_err(|_| ClosedError::new())
}
fn try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError> {
@@ -412,8 +458,12 @@ impl Semaphore for AtomicUsize {
self.load(Acquire) >> 1 == 0
}
- fn poll_acquire(&self, _cx: &mut Context<'_>, permit: &mut ()) -> Poll<Result<(), ()>> {
- Ready(self.try_acquire(permit).map_err(|_| ()))
+ fn poll_acquire(
+ &self,
+ _cx: &mut Context<'_>,
+ permit: &mut (),
+ ) -> Poll<Result<(), ClosedError>> {
+ Ready(self.try_acquire(permit).map_err(|_| ClosedError::new()))
}
fn try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError> {
diff --git a/tokio/src/sync/mpsc/error.rs b/tokio/src/sync/mpsc/error.rs
new file mode 100644
index 00000000..6238f854
--- /dev/null
+++ b/tokio/src/sync/mpsc/error.rs
@@ -0,0 +1,86 @@
+//! Channel error types
+
+use std::error::Error;
+use std::fmt;
+
+/// Error returned by the `Sender`.
+#[derive(Debug)]
+pub struct SendError<T>(pub T);
+
+impl<T> fmt::Display for SendError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "channel closed")
+ }
+}
+
+impl<T: fmt::Debug> ::std::error::Error for SendError<T> {}
+
+// ===== TrySendError =====
+
+/// This enumeration is the list of the possible error outcomes for the
+/// [try_send](super::Sender::try_send) method.
+#[derive(Debug)]
+pub enum TrySendError<T> {
+ /// The data could not be sent on the channel because the channel is
+ /// currently full and sending would require blocking.
+ Full(T),
+
+ /// The receive half of the channel was explicitly closed or has been
+ /// dropped.
+ Closed(T),
+}
+
+impl<T: fmt::Debug> Error for TrySendError<T> {}
+
+impl<T> fmt::Display for TrySendError<T> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(
+ fmt,
+ "{}",
+ match self {
+ TrySendError::Full(..) => "no available capacity",
+ TrySendError::Closed(..) => "channel closed",
+ }
+ )
+ }
+}
+
+impl<T> From<SendError<T>> for TrySendError<T> {
+ fn from(src: SendError<T>) -> TrySendError<T> {
+ TrySendError::Closed(src.0)
+ }
+}
+
+// ===== RecvError =====
+
+/// Error returned by `Receiver`.
+#[derive(Debug)]
+pub struct RecvError(());
+
+impl fmt::Display for RecvError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "channel closed")
+ }
+}
+
+impl Error for RecvError {}
+
+// ===== ClosedError =====
+
+/// Erorr returned by [`Sender::poll_ready`](super::Sender::poll_ready)].
+#[derive(Debug)]
+pub struct ClosedError(());
+
+impl ClosedError {
+ pub(crate) fn new() -> ClosedError {
+ ClosedError(())
+ }
+}
+
+impl fmt::Display for ClosedError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "channel closed")
+ }
+}
+
+impl Error for ClosedError {}
diff --git a/tokio/src/sync/mpsc/mod.rs b/tokio/src/sync/mpsc/mod.rs
index 3b95b954..7927dde6 100644
--- a/tokio/src/sync/mpsc/mod.rs
+++ b/tokio/src/sync/mpsc/mod.rs
@@ -46,12 +46,7 @@ pub(super) mod list;
mod unbounded;
pub use self::unbounded::{unbounded_channel, UnboundedReceiver, UnboundedSender};
-pub mod error {
- //! Channel error types
-
- pub use super::bounded::{RecvError, SendError, TrySendError};
- pub use super::unbounded::{UnboundedRecvError, UnboundedSendError, UnboundedTrySendError};
-}
+pub mod error;
/// The number of values a block can contain.
///
diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs
index 4eb750ef..07a173c2 100644
--- a/tokio/src/sync/mpsc/unbounded.rs
+++ b/tokio/src/sync/mpsc/unbounded.rs
@@ -1,11 +1,10 @@
use crate::loom::sync::atomic::AtomicUsize;
use crate::sync::mpsc::chan;
+use crate::sync::mpsc::error::SendError;
use std::fmt;
use std::task::{Context, Poll};
-use std::pin::Pin;
-
/// Send values to the associated `UnboundedReceiver`.
///
/// Instances are created by the
@@ -47,18 +46,6 @@ impl<T> fmt::Debug for UnboundedReceiver<T> {
}
}
-/// Error returned by the `UnboundedSender`.
-#[derive(Debug)]
-pub struct UnboundedSendError(());
-
-/// Returned by `UnboundedSender::try_send` when the channel has been closed.
-#[derive(Debug)]
-pub struct UnboundedTrySendError<T>(T);
-
-/// Error returned by `UnboundedReceiver`.
-#[derive(Debug)]
-pub struct UnboundedRecvError(());
-
/// Create an unbounded mpsc channel for communicating between asynchronous
/// tasks.
///
@@ -86,7 +73,7 @@ impl<T> UnboundedReceiver<T> {
UnboundedReceiver { chan }
}
- #[doc(hidden)] // TODO: remove
+ #[doc(hidden)] // TODO: doc
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}
@@ -103,10 +90,10 @@ impl<T> UnboundedReceiver<T> {
///
/// #[tokio::main]
/// async fn main() {
- /// let (mut tx, mut rx) = mpsc::unbounded_channel();
+ /// let (tx, mut rx) = mpsc::unbounded_channel();
///
/// tokio::spawn(async move {
- /// tx.try_send("hello").unwrap();
+ /// tx.send("hello").unwrap();
/// });
///
/// assert_eq!(Some("hello"), rx.recv().await);
@@ -121,17 +108,17 @@ impl<T> UnboundedReceiver<T> {
///
/// #[tokio::main]
/// async fn main() {
- /// let (mut tx, mut rx) = mpsc::unbounded_channel();
+ /// let (tx, mut rx) = mpsc::unbounded_channel();
///
- /// tx.try_send("hello").unwrap();
- /// tx.try_send("world").unwrap();
+ /// tx.send("hello").unwrap();
+ /// tx.send("world").unwrap();
///
/// assert_eq!(Some("hello"), rx.recv().await);
/// assert_eq!(Some("world"), rx.recv().await);
/// }
/// ```
pub async fn recv(&mut self) -> Option<T> {
- use futures_util::future::poll_fn;
+ use crate::future::poll_fn;
poll_fn(|cx| self.poll_recv(cx)).await
}
@@ -145,11 +132,12 @@ impl<T> UnboundedReceiver<T> {
}
}
+#[cfg(feature = "stream")]
impl<T> futures_core::Stream for UnboundedReceiver<T> {
type Item = T;
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
- self.chan.recv(cx)
+ fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ self.poll_recv(cx)
}
}
@@ -159,72 +147,8 @@ impl<T> UnboundedSender<T> {
}
/// Attempts to send a message on this `UnboundedSender` without blocking.
- pub fn try_send(&mut self, message: T) -> Result<(), UnboundedTrySendError<T>> {
- self.chan.try_send(message)?;
+ pub fn send(&self, message: T) -> Result<(), SendError<T>> {
+ self.chan.send_unbounded(message)?;
Ok(())
}
}
-
-impl<T> futures_sink::Sink<T> for UnboundedSender<T> {
- type Error = UnboundedSendError;
-
- fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-
- fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
- self.try_send(msg).map_err(|_| UnboundedSendError(()))
- }
-
- fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-
- fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Poll::Ready(Ok(()))
- }
-}
-
-// ===== impl UnboundedSendError =====
-
-impl fmt::Display for UnboundedSendError {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(fmt, "channel closed")
- }
-}
-
-impl ::std::error::Error for UnboundedSendError {}
-
-// ===== impl TrySendError =====
-
-impl<T> UnboundedTrySendError<T> {
- /// Get the inner value.
- pub fn into_inner(self) -> T {
- self.0
- }
-}
-
-impl<T: fmt::Debug> fmt::Display for UnboundedTrySendError<T> {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(fmt, "channel closed")
- }
-}
-
-impl<T: fmt::Debug> ::std::error::Error for UnboundedTrySendError<T> {}
-
-impl<T> From<(T, chan::TrySendError)> for UnboundedTrySendError<T> {
- fn from((value, err): (T, chan::TrySendError)) -> UnboundedTrySendError<T> {
- assert_eq!(chan::TrySendError::Closed, err);
- UnboundedTrySendError(value)
- }
-}
-
-// ===== impl UnboundedRecvError =====
-
-impl fmt::Display for UnboundedRecvError {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(fmt, "channel closed")
- }
-}
-
-impl ::std::error::Error for UnboundedRecvError {}
diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs
index ae45c666..b06f22b4 100644
--- a/tokio/src/sync/mutex.rs
+++ b/tokio/src/sync/mutex.rs
@@ -29,9 +29,9 @@
//! [`Mutex`]: struct.Mutex.html
//! [`MutexGuard`]: struct.MutexGuard.html
+use crate::future::poll_fn;
use crate::sync::semaphore;
-use futures_util::future::poll_fn;
use std::cell::UnsafeCell;
use std::fmt;
use std::ops::{Deref, DerefMut};
diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs
index 3c757e9e..7b84f319 100644
--- a/tokio/src/sync/oneshot.rs
+++ b/tokio/src/sync/oneshot.rs
@@ -4,7 +4,6 @@ use crate::loom::cell::CausalCell;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Arc;
-use futures_core::ready;
use std::fmt;
use std::future::Future;
use std::mem::MaybeUninit;
@@ -225,7 +224,7 @@ impl<T> Sender<T> {
/// }
/// ```
pub async fn closed(&mut self) {
- use futures_util::future::poll_fn;
+ use crate::future::poll_fn;
poll_fn(|cx| self.poll_closed(cx)).await
}
diff --git a/tokio/src/sync/tests/loom_atomic_waker.rs b/tokio/src/sync/tests/loom_atomic_waker.rs
index 81e200ff..c148bcbe 100644
--- a/tokio/src/sync/tests/loom_atomic_waker.rs
+++ b/tokio/src/sync/tests/loom_atomic_waker.rs
@@ -1,6 +1,6 @@
use crate::sync::task::AtomicWaker;
-use futures_util::future::poll_fn;
+use futures::future::poll_fn;
use loom::future::block_on;
use loom::sync::atomic::AtomicUsize;
use loom::thread;
diff --git a/tokio/src/sync/tests/loom_mpsc.rs b/tokio/src/sync/tests/loom_mpsc.rs
index 748ae9e1..8fd6d14b 100644
--- a/tokio/src/sync/tests/loom_mpsc.rs
+++ b/tokio/src/sync/tests/loom_mpsc.rs
@@ -1,6 +1,6 @@
use crate::sync::mpsc;
-use futures_util::future::poll_fn;
+use futures::future::poll_fn;
use loom::future::block_on;
use loom::thread;
diff --git a/tokio/src/sync/tests/loom_oneshot.rs b/tokio/src/sync/tests/loom_oneshot.rs
index 52104736..dfa7459d 100644
--- a/tokio/src/sync/tests/loom_oneshot.rs
+++ b/tokio/src/sync/tests/loom_oneshot.rs
@@ -1,6 +1,6 @@
use crate::sync::oneshot;
-use futures_util::future::poll_fn;
+use futures::future::poll_fn;
use loom::future::block_on;
use loom::thread;
use std::task::Poll::{Pending, Ready};
diff --git a/tokio/src/sync/tests/loom_semaphore.rs b/tokio/src/sync/tests/loom_semaphore.rs
index d14c7668..7b8de0f0 100644
--- a/tokio/src/sync/tests/loom_semaphore.rs
+++ b/tokio/src/sync/tests/loom_semaphore.rs
@@ -1,7 +1,6 @@
use crate::sync::semaphore::*;
-use futures_core::ready;
-use futures_util::future::poll_fn;
+use futures::future::poll_fn;
use loom::future::block_on;
use loom::thread;
use std::future::Future;
diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs
index 928c2c46..d8e2cc35 100644
--- a/tokio/src/sync/watch.rs
+++ b/tokio/src/sync/watch.rs
@@ -51,20 +51,16 @@
//! [`Sender::closed`]: struct.Sender.html#method.closed
//! [`Receiver::get_ref`]: struct.Receiver.html#method.get_ref
+use crate::future::poll_fn;
use crate::sync::task::AtomicWaker;
-use core::task::Poll::{Pending, Ready};
-use core::task::{Context, Poll};
use fnv::FnvHashMap;
-use futures_util::future::poll_fn;
use std::ops;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak};
-
-use futures_core::ready;
-use futures_util::pin_mut;
-use std::pin::Pin;
+use std::task::Poll::{Pending, Ready};
+use std::task::{Context, Poll};
/// Receives values from the associated [`Sender`](struct.Sender.html).
///
@@ -235,77 +231,50 @@ impl<T> Receiver<T> {
Ref { inner }
}
- /// Attempts to receive the latest value sent via the channel.
- ///
- /// If a new, unobserved, value has been sent, a reference to it is
- /// returned. If no new value has been sent, then `Pending` is returned and
- /// the current task is notified once a new value is sent.
- ///
- /// Only the **most recent** value is returned. If the receiver is falling
- /// behind the sender, intermediate values are dropped.
- pub async fn recv_ref(&mut self) -> Option<Ref<'_, T>> {
- let shared = &self.shared;
- let inner = &self.inner;
- let version = self.ver;
-
- match poll_fn(|cx| poll_lock(cx, shared, inner, version)).await {
- Some((lock, version)) => {
- self.ver = version;
- Some(lock)
- }
- None => None,
- }
- }
-}
+ // TODO: document
+ #[doc(hidden)]
+ pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Option<Ref<'a, T>>> {
+ // Make sure the task is up to date
+ self.inner.waker.register_by_ref(cx.waker());
-fn poll_lock<'a, T>(
- cx: &mut Context<'_>,
- shared: &'a Arc<Shared<T>>,
- inner: &Arc<WatchInner>,
- ver: usize,
-) -> Poll<Option<(Ref<'a, T>, usize)>> {
- // Make sure the task is up to date
- inner.waker.register_by_ref(cx.waker());
+ let state = self.shared.version.load(SeqCst);
+ let version = state & !CLOSED;
- let state = shared.version.load(SeqCst);
- let version = state & !CLOSED;
+ if version != self.ver {
+ let inner = self.shared.value.read().unwrap();
+ self.ver = version;
- if version != ver {
- let inner = shared.value.read().unwrap();
+ return Ready(Some(Ref { inner }));
+ }
- return Ready(Some((Ref { inner }, version)));
- }
+ if CLOSED == state & CLOSED {
+ // The `Store` handle has been dropped.
+ return Ready(None);
+ }
- if CLOSED == state & CLOSED {
- // The `Store` handle has been dropped.
- return Ready(None);
+ Pending
}
-
- Pending
}
impl<T: Clone> Receiver<T> {
/// Attempts to clone the latest value sent via the channel.
- ///
- /// This is equivalent to calling `clone()` on the value returned by
- /// `recv_ref()`.
- #[allow(clippy::map_clone)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3274
pub async fn recv(&mut self) -> Option<T> {
- self.recv_ref().await.map(|v_ref| v_ref.clone())
+ poll_fn(|cx| {
+ let v_ref = ready!(self.poll_recv_ref(cx));
+ Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone()))
+ })
+ .await
}
}
+#[cfg(feature = "stream")]
impl<T: Clone> futures_core::Stream for Receiver<T> {
type Item = T;
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
- use std::future::Future;
+ fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ let v_ref = ready!(self.poll_recv_ref(cx));
- let fut = self.get_mut().recv();
- pin_mut!(fut);
-
- let item = ready!(fut.poll(cx));
- Ready(item.map(|v_ref| v_ref))
+ Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone()))
}
}
@@ -394,27 +363,6 @@ impl<T> Sender<T> {
}
}
-impl<T> futures_sink::Sink<T> for Sender<T> {
- type Error = error::SendError<T>;
-
- fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Ready(Ok(()))
- }
-
- fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
- self.as_ref().get_ref().broadcast(item)?;
- Ok(())
- }
-
- fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Ready(Ok(()))
- }
-
- fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Ready(Ok(()))
- }
-}
-
/// Notify all watchers of a change
fn notify_all<T>(shared: &Shared<T>) {
let watchers = shared.watchers.lock().unwrap();