From 8a7e57786a5dca139f5b4261685e22991ded0859 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 15 Nov 2019 22:11:13 -0800 Subject: Limit `futures` dependency to `Stream` via feature flag (#1774) In an effort to reach API stability, the `tokio` crate is shedding its _public_ dependencies on crates that are either a) do not provide a stable (1.0+) release with longevity guarantees or b) match the `tokio` release cadence. Of course, implementing `std` traits fits the requirements. The on exception, for now, is the `Stream` trait found in `futures_core`. It is expected that this trait will not change much and be moved into `std. Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain a dependency on this trait given how foundational it is. Since the `Stream` implementation is optional, types that are logically streams provide `async fn next_*` functions to obtain the next value. Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`. Additionally, some misc cleanup is also done: - `tokio::io::io` -> `tokio::io::util`. - `delay` -> `delay_until`. - `Timeout::new` -> `timeout(...)`. - `signal::ctrl_c()` returns a future instead of a stream. - `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait). - `time::Throttle` is removed (due to lack of `Stream` trait). - Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns). --- tokio/src/sync/watch.rs | 110 +++++++++++++----------------------------------- 1 file changed, 29 insertions(+), 81 deletions(-) (limited to 'tokio/src/sync/watch.rs') diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 928c2c46..d8e2cc35 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -51,20 +51,16 @@ //! [`Sender::closed`]: struct.Sender.html#method.closed //! [`Receiver::get_ref`]: struct.Receiver.html#method.get_ref +use crate::future::poll_fn; use crate::sync::task::AtomicWaker; -use core::task::Poll::{Pending, Ready}; -use core::task::{Context, Poll}; use fnv::FnvHashMap; -use futures_util::future::poll_fn; use std::ops; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak}; - -use futures_core::ready; -use futures_util::pin_mut; -use std::pin::Pin; +use std::task::Poll::{Pending, Ready}; +use std::task::{Context, Poll}; /// Receives values from the associated [`Sender`](struct.Sender.html). /// @@ -235,77 +231,50 @@ impl Receiver { Ref { inner } } - /// Attempts to receive the latest value sent via the channel. - /// - /// If a new, unobserved, value has been sent, a reference to it is - /// returned. If no new value has been sent, then `Pending` is returned and - /// the current task is notified once a new value is sent. - /// - /// Only the **most recent** value is returned. If the receiver is falling - /// behind the sender, intermediate values are dropped. - pub async fn recv_ref(&mut self) -> Option> { - let shared = &self.shared; - let inner = &self.inner; - let version = self.ver; - - match poll_fn(|cx| poll_lock(cx, shared, inner, version)).await { - Some((lock, version)) => { - self.ver = version; - Some(lock) - } - None => None, - } - } -} + // TODO: document + #[doc(hidden)] + pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll>> { + // Make sure the task is up to date + self.inner.waker.register_by_ref(cx.waker()); -fn poll_lock<'a, T>( - cx: &mut Context<'_>, - shared: &'a Arc>, - inner: &Arc, - ver: usize, -) -> Poll, usize)>> { - // Make sure the task is up to date - inner.waker.register_by_ref(cx.waker()); + let state = self.shared.version.load(SeqCst); + let version = state & !CLOSED; - let state = shared.version.load(SeqCst); - let version = state & !CLOSED; + if version != self.ver { + let inner = self.shared.value.read().unwrap(); + self.ver = version; - if version != ver { - let inner = shared.value.read().unwrap(); + return Ready(Some(Ref { inner })); + } - return Ready(Some((Ref { inner }, version))); - } + if CLOSED == state & CLOSED { + // The `Store` handle has been dropped. + return Ready(None); + } - if CLOSED == state & CLOSED { - // The `Store` handle has been dropped. - return Ready(None); + Pending } - - Pending } impl Receiver { /// Attempts to clone the latest value sent via the channel. - /// - /// This is equivalent to calling `clone()` on the value returned by - /// `recv_ref()`. - #[allow(clippy::map_clone)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3274 pub async fn recv(&mut self) -> Option { - self.recv_ref().await.map(|v_ref| v_ref.clone()) + poll_fn(|cx| { + let v_ref = ready!(self.poll_recv_ref(cx)); + Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone())) + }) + .await } } +#[cfg(feature = "stream")] impl futures_core::Stream for Receiver { type Item = T; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - use std::future::Future; + fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let v_ref = ready!(self.poll_recv_ref(cx)); - let fut = self.get_mut().recv(); - pin_mut!(fut); - - let item = ready!(fut.poll(cx)); - Ready(item.map(|v_ref| v_ref)) + Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone())) } } @@ -394,27 +363,6 @@ impl Sender { } } -impl futures_sink::Sink for Sender { - type Error = error::SendError; - - fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Ready(Ok(())) - } - - fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - self.as_ref().get_ref().broadcast(item)?; - Ok(()) - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Ready(Ok(())) - } -} - /// Notify all watchers of a change fn notify_all(shared: &Shared) { let watchers = shared.watchers.lock().unwrap(); -- cgit v1.2.3