diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-15 22:11:13 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-15 22:11:13 -0800 |
commit | 8a7e57786a5dca139f5b4261685e22991ded0859 (patch) | |
tree | b69d1c48f8a760a58fc7ccfe0376d9812a88d303 /tokio/src/sync | |
parent | 930679587ae42e4df3113159ccf33fb5923dd73a (diff) |
Limit `futures` dependency to `Stream` via feature flag (#1774)
In an effort to reach API stability, the `tokio` crate is shedding its
_public_ dependencies on crates that are either a) do not provide a
stable (1.0+) release with longevity guarantees or b) match the `tokio`
release cadence. Of course, implementing `std` traits fits the
requirements.
The on exception, for now, is the `Stream` trait found in `futures_core`.
It is expected that this trait will not change much and be moved into `std.
Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain
a dependency on this trait given how foundational it is.
Since the `Stream` implementation is optional, types that are logically
streams provide `async fn next_*` functions to obtain the next value.
Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`.
Additionally, some misc cleanup is also done:
- `tokio::io::io` -> `tokio::io::util`.
- `delay` -> `delay_until`.
- `Timeout::new` -> `timeout(...)`.
- `signal::ctrl_c()` returns a future instead of a stream.
- `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait).
- `time::Throttle` is removed (due to lack of `Stream` trait).
- Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns).
Diffstat (limited to 'tokio/src/sync')
-rw-r--r-- | tokio/src/sync/barrier.rs | 3 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/bounded.rs | 144 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/chan.rs | 98 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/error.rs | 86 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/mod.rs | 7 | ||||
-rw-r--r-- | tokio/src/sync/mpsc/unbounded.rs | 102 | ||||
-rw-r--r-- | tokio/src/sync/mutex.rs | 2 | ||||
-rw-r--r-- | tokio/src/sync/oneshot.rs | 3 | ||||
-rw-r--r-- | tokio/src/sync/tests/loom_atomic_waker.rs | 2 | ||||
-rw-r--r-- | tokio/src/sync/tests/loom_mpsc.rs | 2 | ||||
-rw-r--r-- | tokio/src/sync/tests/loom_oneshot.rs | 2 | ||||
-rw-r--r-- | tokio/src/sync/tests/loom_semaphore.rs | 3 | ||||
-rw-r--r-- | tokio/src/sync/watch.rs | 110 |
13 files changed, 230 insertions, 334 deletions
diff --git a/tokio/src/sync/barrier.rs b/tokio/src/sync/barrier.rs index 1582120e..911e78fe 100644 --- a/tokio/src/sync/barrier.rs +++ b/tokio/src/sync/barrier.rs @@ -8,8 +8,9 @@ use std::sync::Mutex; /// # #[tokio::main] /// # async fn main() { /// use tokio::sync::Barrier; +/// +/// use futures::future::join_all; /// use std::sync::Arc; -/// use futures_util::future::join_all; /// /// let mut handles = Vec::with_capacity(10); /// let barrier = Arc::new(Barrier::new(10)); diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 787dd507..523dde75 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1,8 +1,8 @@ use crate::sync::mpsc::chan; +use crate::sync::mpsc::error::{ClosedError, SendError, TrySendError}; use crate::sync::semaphore; use std::fmt; -use std::pin::Pin; use std::task::{Context, Poll}; /// Send values to the associated `Receiver`. @@ -44,27 +44,6 @@ impl<T> fmt::Debug for Receiver<T> { } } -/// Error returned by the `Sender`. -#[derive(Debug)] -pub struct SendError(()); - -/// Error returned by `Sender::try_send`. -#[derive(Debug)] -pub struct TrySendError<T> { - kind: ErrorKind, - value: T, -} - -#[derive(Debug)] -enum ErrorKind { - Closed, - NoCapacity, -} - -/// Error returned by `Receiver`. -#[derive(Debug)] -pub struct RecvError(()); - /// Create a bounded mpsc channel for communicating between asynchronous tasks, /// returning the sender/receiver halves. /// @@ -161,12 +140,12 @@ impl<T> Receiver<T> { /// } /// ``` pub async fn recv(&mut self) -> Option<T> { - use futures_util::future::poll_fn; + use crate::future::poll_fn; poll_fn(|cx| self.poll_recv(cx)).await } - #[doc(hidden)] // TODO: remove + #[doc(hidden)] // TODO: document pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { self.chan.recv(cx) } @@ -180,11 +159,14 @@ impl<T> Receiver<T> { } } +impl<T> Unpin for Receiver<T> {} + +#[cfg(feature = "stream")] impl<T> futures_core::Stream for Receiver<T> { type Item = T; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { - self.get_mut().poll_recv(cx) + fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + self.poll_recv(cx) } } @@ -193,9 +175,9 @@ impl<T> Sender<T> { Sender { chan } } - #[doc(hidden)] // TODO: remove - pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> { - self.chan.poll_ready(cx).map_err(|_| SendError(())) + #[doc(hidden)] // TODO: document + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> { + self.chan.poll_ready(cx).map_err(|_| ClosedError::new()) } /// Attempts to send a message on this `Sender`, returning the message @@ -233,105 +215,17 @@ impl<T> Sender<T> { /// } /// } /// ``` - pub async fn send(&mut self, value: T) -> Result<(), SendError> { - use futures_util::future::poll_fn; - - poll_fn(|cx| self.poll_ready(cx)).await?; - - self.try_send(value).map_err(|_| SendError(())) - } -} - -impl<T> futures_sink::Sink<T> for Sender<T> { - type Error = SendError; - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Sender::poll_ready(self.get_mut(), cx) - } - - fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { - self.as_mut().try_send(msg).map_err(|err| { - assert!(err.is_full(), "call `poll_ready` before sending"); - SendError(()) - }) - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) - } -} - -// ===== impl SendError ===== - -impl fmt::Display for SendError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "channel closed") - } -} - -impl ::std::error::Error for SendError {} - -// ===== impl TrySendError ===== - -impl<T> TrySendError<T> { - /// Get the inner value. - pub fn into_inner(self) -> T { - self.value - } + pub async fn send(&mut self, value: T) -> Result<(), SendError<T>> { + use crate::future::poll_fn; - /// Did the send fail because the channel has been closed? - pub fn is_closed(&self) -> bool { - if let ErrorKind::Closed = self.kind { - true - } else { - false + if poll_fn(|cx| self.poll_ready(cx)).await.is_err() { + return Err(SendError(value)); } - } - /// Did the send fail because the channel was at capacity? - pub fn is_full(&self) -> bool { - if let ErrorKind::NoCapacity = self.kind { - true - } else { - false + match self.try_send(value) { + Ok(()) => Ok(()), + Err(TrySendError::Full(_)) => unreachable!(), + Err(TrySendError::Closed(value)) => Err(SendError(value)), } } } - -impl<T: fmt::Debug> fmt::Display for TrySendError<T> { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - let descr = match self.kind { - ErrorKind::Closed => "channel closed", - ErrorKind::NoCapacity => "no available capacity", - }; - write!(fmt, "{}", descr) - } -} - -impl<T: fmt::Debug> ::std::error::Error for TrySendError<T> {} - -impl<T> From<(T, chan::TrySendError)> for TrySendError<T> { - fn from((value, err): (T, chan::TrySendError)) -> TrySendError<T> { - TrySendError { - value, - kind: match err { - chan::TrySendError::Closed => ErrorKind::Closed, - chan::TrySendError::NoPermits => ErrorKind::NoCapacity, - }, - } - } -} - -// ===== impl RecvError ===== - -impl fmt::Display for RecvError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "channel closed") - } -} - -impl ::std::error::Error for RecvError {} diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index ad0d99a8..03f35339 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -1,5 +1,9 @@ -use crate::loom::{cell::CausalCell, future::AtomicWaker, sync::atomic::AtomicUsize, sync::Arc}; -use crate::sync::mpsc::list; +use crate::loom::cell::CausalCell; +use crate::loom::future::AtomicWaker; +use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::Arc; +use crate::sync::mpsc::error::ClosedError; +use crate::sync::mpsc::{error, list}; use std::fmt; use std::process; @@ -43,7 +47,25 @@ where #[derive(Debug, Eq, PartialEq)] pub(crate) enum TrySendError { Closed, - NoPermits, + Full, +} + +impl<T> From<(T, TrySendError)> for error::SendError<T> { + fn from(src: (T, TrySendError)) -> error::SendError<T> { + match src.1 { + TrySendError::Closed => error::SendError(src.0), + TrySendError::Full => unreachable!(), + } + } +} + +impl<T> From<(T, TrySendError)> for error::TrySendError<T> { + fn from(src: (T, TrySendError)) -> error::TrySendError<T> { + match src.1 { + TrySendError::Closed => error::TrySendError::Closed(src.0), + TrySendError::Full => error::TrySendError::Full(src.0), + } + } } pub(crate) trait Semaphore { @@ -59,8 +81,11 @@ pub(crate) trait Semaphore { fn add_permit(&self); - fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Self::Permit) - -> Poll<Result<(), ()>>; + fn poll_acquire( + &self, + cx: &mut Context<'_>, + permit: &mut Self::Permit, + ) -> Poll<Result<(), ClosedError>>; fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>; @@ -161,26 +186,19 @@ where } } - pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>> { + pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>> { self.inner.semaphore.poll_acquire(cx, &mut self.permit) } /// Send a message and notify the receiver. pub(crate) fn try_send(&mut self, value: T) -> Result<(), (T, TrySendError)> { - if let Err(e) = self.inner.semaphore.try_acquire(&mut self.permit) { - return Err((value, e)); - } - - // Push the value - self.inner.tx.push(value); - - // Notify the rx task - self.inner.rx_waker.wake(); - - // Release the permit - self.inner.semaphore.forget(&mut self.permit); + self.inner.try_send(value, &mut self.permit) + } +} - Ok(()) +impl<T> Tx<T, AtomicUsize> { + pub(crate) fn send_unbounded(&self, value: T) -> Result<(), (T, TrySendError)> { + self.inner.try_send(value, &mut ()) } } @@ -317,6 +335,28 @@ where // ===== impl Chan ===== +impl<T, S> Chan<T, S> +where + S: Semaphore, +{ + fn try_send(&self, value: T, permit: &mut S::Permit) -> Result<(), (T, TrySendError)> { + if let Err(e) = self.semaphore.try_acquire(permit) { + return Err((value, e)); + } + + // Push the value + self.tx.push(value); + + // Notify the rx task + self.rx_waker.wake(); + + // Release the permit + self.semaphore.forget(permit); + + Ok(()) + } +} + impl<T, S> Drop for Chan<T, S> { fn drop(&mut self) { use super::block::Read::Value; @@ -339,7 +379,7 @@ impl From<TryAcquireError> for TrySendError { if src.is_closed() { TrySendError::Closed } else if src.is_no_permits() { - TrySendError::NoPermits + TrySendError::Full } else { unreachable!(); } @@ -369,8 +409,14 @@ impl Semaphore for (crate::sync::semaphore::Semaphore, usize) { self.0.available_permits() == self.1 } - fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Permit) -> Poll<Result<(), ()>> { - permit.poll_acquire(cx, &self.0).map_err(|_| ()) + fn poll_acquire( + &self, + cx: &mut Context<'_>, + permit: &mut Permit, + ) -> Poll<Result<(), ClosedError>> { + permit + .poll_acquire(cx, &self.0) + .map_err(|_| ClosedError::new()) } fn try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError> { @@ -412,8 +458,12 @@ impl Semaphore for AtomicUsize { self.load(Acquire) >> 1 == 0 } - fn poll_acquire(&self, _cx: &mut Context<'_>, permit: &mut ()) -> Poll<Result<(), ()>> { - Ready(self.try_acquire(permit).map_err(|_| ())) + fn poll_acquire( + &self, + _cx: &mut Context<'_>, + permit: &mut (), + ) -> Poll<Result<(), ClosedError>> { + Ready(self.try_acquire(permit).map_err(|_| ClosedError::new())) } fn try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError> { diff --git a/tokio/src/sync/mpsc/error.rs b/tokio/src/sync/mpsc/error.rs new file mode 100644 index 00000000..6238f854 --- /dev/null +++ b/tokio/src/sync/mpsc/error.rs @@ -0,0 +1,86 @@ +//! Channel error types + +use std::error::Error; +use std::fmt; + +/// Error returned by the `Sender`. +#[derive(Debug)] +pub struct SendError<T>(pub T); + +impl<T> fmt::Display for SendError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } +} + +impl<T: fmt::Debug> ::std::error::Error for SendError<T> {} + +// ===== TrySendError ===== + +/// This enumeration is the list of the possible error outcomes for the +/// [try_send](super::Sender::try_send) method. +#[derive(Debug)] +pub enum TrySendError<T> { + /// The data could not be sent on the channel because the channel is + /// currently full and sending would require blocking. + Full(T), + + /// The receive half of the channel was explicitly closed or has been + /// dropped. + Closed(T), +} + +impl<T: fmt::Debug> Error for TrySendError<T> {} + +impl<T> fmt::Display for TrySendError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + fmt, + "{}", + match self { + TrySendError::Full(..) => "no available capacity", + TrySendError::Closed(..) => "channel closed", + } + ) + } +} + +impl<T> From<SendError<T>> for TrySendError<T> { + fn from(src: SendError<T>) -> TrySendError<T> { + TrySendError::Closed(src.0) + } +} + +// ===== RecvError ===== + +/// Error returned by `Receiver`. +#[derive(Debug)] +pub struct RecvError(()); + +impl fmt::Display for RecvError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } +} + +impl Error for RecvError {} + +// ===== ClosedError ===== + +/// Erorr returned by [`Sender::poll_ready`](super::Sender::poll_ready)]. +#[derive(Debug)] +pub struct ClosedError(()); + +impl ClosedError { + pub(crate) fn new() -> ClosedError { + ClosedError(()) + } +} + +impl fmt::Display for ClosedError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } +} + +impl Error for ClosedError {} diff --git a/tokio/src/sync/mpsc/mod.rs b/tokio/src/sync/mpsc/mod.rs index 3b95b954..7927dde6 100644 --- a/tokio/src/sync/mpsc/mod.rs +++ b/tokio/src/sync/mpsc/mod.rs @@ -46,12 +46,7 @@ pub(super) mod list; mod unbounded; pub use self::unbounded::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -pub mod error { - //! Channel error types - - pub use super::bounded::{RecvError, SendError, TrySendError}; - pub use super::unbounded::{UnboundedRecvError, UnboundedSendError, UnboundedTrySendError}; -} +pub mod error; /// The number of values a block can contain. /// diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index 4eb750ef..07a173c2 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -1,11 +1,10 @@ use crate::loom::sync::atomic::AtomicUsize; use crate::sync::mpsc::chan; +use crate::sync::mpsc::error::SendError; use std::fmt; use std::task::{Context, Poll}; -use std::pin::Pin; - /// Send values to the associated `UnboundedReceiver`. /// /// Instances are created by the @@ -47,18 +46,6 @@ impl<T> fmt::Debug for UnboundedReceiver<T> { } } -/// Error returned by the `UnboundedSender`. -#[derive(Debug)] -pub struct UnboundedSendError(()); - -/// Returned by `UnboundedSender::try_send` when the channel has been closed. -#[derive(Debug)] -pub struct UnboundedTrySendError<T>(T); - -/// Error returned by `UnboundedReceiver`. -#[derive(Debug)] -pub struct UnboundedRecvError(()); - /// Create an unbounded mpsc channel for communicating between asynchronous /// tasks. /// @@ -86,7 +73,7 @@ impl<T> UnboundedReceiver<T> { UnboundedReceiver { chan } } - #[doc(hidden)] // TODO: remove + #[doc(hidden)] // TODO: doc pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { self.chan.recv(cx) } @@ -103,10 +90,10 @@ impl<T> UnboundedReceiver<T> { /// /// #[tokio::main] /// async fn main() { - /// let (mut tx, mut rx) = mpsc::unbounded_channel(); + /// let (tx, mut rx) = mpsc::unbounded_channel(); /// /// tokio::spawn(async move { - /// tx.try_send("hello").unwrap(); + /// tx.send("hello").unwrap(); /// }); /// /// assert_eq!(Some("hello"), rx.recv().await); @@ -121,17 +108,17 @@ impl<T> UnboundedReceiver<T> { /// /// #[tokio::main] /// async fn main() { - /// let (mut tx, mut rx) = mpsc::unbounded_channel(); + /// let (tx, mut rx) = mpsc::unbounded_channel(); /// - /// tx.try_send("hello").unwrap(); - /// tx.try_send("world").unwrap(); + /// tx.send("hello").unwrap(); + /// tx.send("world").unwrap(); /// /// assert_eq!(Some("hello"), rx.recv().await); /// assert_eq!(Some("world"), rx.recv().await); /// } /// ``` pub async fn recv(&mut self) -> Option<T> { - use futures_util::future::poll_fn; + use crate::future::poll_fn; poll_fn(|cx| self.poll_recv(cx)).await } @@ -145,11 +132,12 @@ impl<T> UnboundedReceiver<T> { } } +#[cfg(feature = "stream")] impl<T> futures_core::Stream for UnboundedReceiver<T> { type Item = T; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { - self.chan.recv(cx) + fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + self.poll_recv(cx) } } @@ -159,72 +147,8 @@ impl<T> UnboundedSender<T> { } /// Attempts to send a message on this `UnboundedSender` without blocking. - pub fn try_send(&mut self, message: T) -> Result<(), UnboundedTrySendError<T>> { - self.chan.try_send(message)?; + pub fn send(&self, message: T) -> Result<(), SendError<T>> { + self.chan.send_unbounded(message)?; Ok(()) } } - -impl<T> futures_sink::Sink<T> for UnboundedSender<T> { - type Error = UnboundedSendError; - - fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) - } - - fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { - self.try_send(msg).map_err(|_| UnboundedSendError(())) - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Poll::Ready(Ok(())) - } -} - -// ===== impl UnboundedSendError ===== - -impl fmt::Display for UnboundedSendError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "channel closed") - } -} - -impl ::std::error::Error for UnboundedSendError {} - -// ===== impl TrySendError ===== - -impl<T> UnboundedTrySendError<T> { - /// Get the inner value. - pub fn into_inner(self) -> T { - self.0 - } -} - -impl<T: fmt::Debug> fmt::Display for UnboundedTrySendError<T> { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "channel closed") - } -} - -impl<T: fmt::Debug> ::std::error::Error for UnboundedTrySendError<T> {} - -impl<T> From<(T, chan::TrySendError)> for UnboundedTrySendError<T> { - fn from((value, err): (T, chan::TrySendError)) -> UnboundedTrySendError<T> { - assert_eq!(chan::TrySendError::Closed, err); - UnboundedTrySendError(value) - } -} - -// ===== impl UnboundedRecvError ===== - -impl fmt::Display for UnboundedRecvError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "channel closed") - } -} - -impl ::std::error::Error for UnboundedRecvError {} diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs index ae45c666..b06f22b4 100644 --- a/tokio/src/sync/mutex.rs +++ b/tokio/src/sync/mutex.rs @@ -29,9 +29,9 @@ //! [`Mutex`]: struct.Mutex.html //! [`MutexGuard`]: struct.MutexGuard.html +use crate::future::poll_fn; use crate::sync::semaphore; -use futures_util::future::poll_fn; use std::cell::UnsafeCell; use std::fmt; use std::ops::{Deref, DerefMut}; diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 3c757e9e..7b84f319 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -4,7 +4,6 @@ use crate::loom::cell::CausalCell; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Arc; -use futures_core::ready; use std::fmt; use std::future::Future; use std::mem::MaybeUninit; @@ -225,7 +224,7 @@ impl<T> Sender<T> { /// } /// ``` pub async fn closed(&mut self) { - use futures_util::future::poll_fn; + use crate::future::poll_fn; poll_fn(|cx| self.poll_closed(cx)).await } diff --git a/tokio/src/sync/tests/loom_atomic_waker.rs b/tokio/src/sync/tests/loom_atomic_waker.rs index 81e200ff..c148bcbe 100644 --- a/tokio/src/sync/tests/loom_atomic_waker.rs +++ b/tokio/src/sync/tests/loom_atomic_waker.rs @@ -1,6 +1,6 @@ use crate::sync::task::AtomicWaker; -use futures_util::future::poll_fn; +use futures::future::poll_fn; use loom::future::block_on; use loom::sync::atomic::AtomicUsize; use loom::thread; diff --git a/tokio/src/sync/tests/loom_mpsc.rs b/tokio/src/sync/tests/loom_mpsc.rs index 748ae9e1..8fd6d14b 100644 --- a/tokio/src/sync/tests/loom_mpsc.rs +++ b/tokio/src/sync/tests/loom_mpsc.rs @@ -1,6 +1,6 @@ use crate::sync::mpsc; -use futures_util::future::poll_fn; +use futures::future::poll_fn; use loom::future::block_on; use loom::thread; diff --git a/tokio/src/sync/tests/loom_oneshot.rs b/tokio/src/sync/tests/loom_oneshot.rs index 52104736..dfa7459d 100644 --- a/tokio/src/sync/tests/loom_oneshot.rs +++ b/tokio/src/sync/tests/loom_oneshot.rs @@ -1,6 +1,6 @@ use crate::sync::oneshot; -use futures_util::future::poll_fn; +use futures::future::poll_fn; use loom::future::block_on; use loom::thread; use std::task::Poll::{Pending, Ready}; diff --git a/tokio/src/sync/tests/loom_semaphore.rs b/tokio/src/sync/tests/loom_semaphore.rs index d14c7668..7b8de0f0 100644 --- a/tokio/src/sync/tests/loom_semaphore.rs +++ b/tokio/src/sync/tests/loom_semaphore.rs @@ -1,7 +1,6 @@ use crate::sync::semaphore::*; -use futures_core::ready; -use futures_util::future::poll_fn; +use futures::future::poll_fn; use loom::future::block_on; use loom::thread; use std::future::Future; diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 928c2c46..d8e2cc35 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -51,20 +51,16 @@ //! [`Sender::closed`]: struct.Sender.html#method.closed //! [`Receiver::get_ref`]: struct.Receiver.html#method.get_ref +use crate::future::poll_fn; use crate::sync::task::AtomicWaker; -use core::task::Poll::{Pending, Ready}; -use core::task::{Context, Poll}; use fnv::FnvHashMap; -use futures_util::future::poll_fn; use std::ops; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak}; - -use futures_core::ready; -use futures_util::pin_mut; -use std::pin::Pin; +use std::task::Poll::{Pending, Ready}; +use std::task::{Context, Poll}; /// Receives values from the associated [`Sender`](struct.Sender.html). /// @@ -235,77 +231,50 @@ impl<T> Receiver<T> { Ref { inner } } - /// Attempts to receive the latest value sent via the channel. - /// - /// If a new, unobserved, value has been sent, a reference to it is - /// returned. If no new value has been sent, then `Pending` is returned and - /// the current task is notified once a new value is sent. - /// - /// Only the **most recent** value is returned. If the receiver is falling - /// behind the sender, intermediate values are dropped. - pub async fn recv_ref(&mut self) -> Option<Ref<'_, T>> { - let shared = &self.shared; - let inner = &self.inner; - let version = self.ver; - - match poll_fn(|cx| poll_lock(cx, shared, inner, version)).await { - Some((lock, version)) => { - self.ver = version; - Some(lock) - } - None => None, - } - } -} + // TODO: document + #[doc(hidden)] + pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Option<Ref<'a, T>>> { + // Make sure the task is up to date + self.inner.waker.register_by_ref(cx.waker()); -fn poll_lock<'a, T>( - cx: &mut Context<'_>, - shared: &'a Arc<Shared<T>>, - inner: &Arc<WatchInner>, - ver: usize, -) -> Poll<Option<(Ref<'a, T>, usize)>> { - // Make sure the task is up to date - inner.waker.register_by_ref(cx.waker()); + let state = self.shared.version.load(SeqCst); + let version = state & !CLOSED; - let state = shared.version.load(SeqCst); - let version = state & !CLOSED; + if version != self.ver { + let inner = self.shared.value.read().unwrap(); + self.ver = version; - if version != ver { - let inner = shared.value.read().unwrap(); + return Ready(Some(Ref { inner })); + } - return Ready(Some((Ref { inner }, version))); - } + if CLOSED == state & CLOSED { + // The `Store` handle has been dropped. + return Ready(None); + } - if CLOSED == state & CLOSED { - // The `Store` handle has been dropped. - return Ready(None); + Pending } - - Pending } impl<T: Clone> Receiver<T> { /// Attempts to clone the latest value sent via the channel. - /// - /// This is equivalent to calling `clone()` on the value returned by - /// `recv_ref()`. - #[allow(clippy::map_clone)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3274 pub async fn recv(&mut self) -> Option<T> { - self.recv_ref().await.map(|v_ref| v_ref.clone()) + poll_fn(|cx| { + let v_ref = ready!(self.poll_recv_ref(cx)); + Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone())) + }) + .await } } +#[cfg(feature = "stream")] impl<T: Clone> futures_core::Stream for Receiver<T> { type Item = T; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { - use std::future::Future; + fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + let v_ref = ready!(self.poll_recv_ref(cx)); - let fut = self.get_mut().recv(); - pin_mut!(fut); - - let item = ready!(fut.poll(cx)); - Ready(item.map(|v_ref| v_ref)) + Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone())) } } @@ -394,27 +363,6 @@ impl<T> Sender<T> { } } -impl<T> futures_sink::Sink<T> for Sender<T> { - type Error = error::SendError<T>; - - fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Ready(Ok(())) - } - - fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - self.as_ref().get_ref().broadcast(item)?; - Ok(()) - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Ready(Ok(())) - } -} - /// Notify all watchers of a change fn notify_all<T>(shared: &Shared<T>) { let watchers = shared.watchers.lock().unwrap(); |