diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-15 22:11:13 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-15 22:11:13 -0800 |
commit | 8a7e57786a5dca139f5b4261685e22991ded0859 (patch) | |
tree | b69d1c48f8a760a58fc7ccfe0376d9812a88d303 /tokio/src/sync/watch.rs | |
parent | 930679587ae42e4df3113159ccf33fb5923dd73a (diff) |
Limit `futures` dependency to `Stream` via feature flag (#1774)
In an effort to reach API stability, the `tokio` crate is shedding its
_public_ dependencies on crates that are either a) do not provide a
stable (1.0+) release with longevity guarantees or b) match the `tokio`
release cadence. Of course, implementing `std` traits fits the
requirements.
The on exception, for now, is the `Stream` trait found in `futures_core`.
It is expected that this trait will not change much and be moved into `std.
Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain
a dependency on this trait given how foundational it is.
Since the `Stream` implementation is optional, types that are logically
streams provide `async fn next_*` functions to obtain the next value.
Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`.
Additionally, some misc cleanup is also done:
- `tokio::io::io` -> `tokio::io::util`.
- `delay` -> `delay_until`.
- `Timeout::new` -> `timeout(...)`.
- `signal::ctrl_c()` returns a future instead of a stream.
- `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait).
- `time::Throttle` is removed (due to lack of `Stream` trait).
- Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns).
Diffstat (limited to 'tokio/src/sync/watch.rs')
-rw-r--r-- | tokio/src/sync/watch.rs | 110 |
1 files changed, 29 insertions, 81 deletions
diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 928c2c46..d8e2cc35 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -51,20 +51,16 @@ //! [`Sender::closed`]: struct.Sender.html#method.closed //! [`Receiver::get_ref`]: struct.Receiver.html#method.get_ref +use crate::future::poll_fn; use crate::sync::task::AtomicWaker; -use core::task::Poll::{Pending, Ready}; -use core::task::{Context, Poll}; use fnv::FnvHashMap; -use futures_util::future::poll_fn; use std::ops; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak}; - -use futures_core::ready; -use futures_util::pin_mut; -use std::pin::Pin; +use std::task::Poll::{Pending, Ready}; +use std::task::{Context, Poll}; /// Receives values from the associated [`Sender`](struct.Sender.html). /// @@ -235,77 +231,50 @@ impl<T> Receiver<T> { Ref { inner } } - /// Attempts to receive the latest value sent via the channel. - /// - /// If a new, unobserved, value has been sent, a reference to it is - /// returned. If no new value has been sent, then `Pending` is returned and - /// the current task is notified once a new value is sent. - /// - /// Only the **most recent** value is returned. If the receiver is falling - /// behind the sender, intermediate values are dropped. - pub async fn recv_ref(&mut self) -> Option<Ref<'_, T>> { - let shared = &self.shared; - let inner = &self.inner; - let version = self.ver; - - match poll_fn(|cx| poll_lock(cx, shared, inner, version)).await { - Some((lock, version)) => { - self.ver = version; - Some(lock) - } - None => None, - } - } -} + // TODO: document + #[doc(hidden)] + pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Option<Ref<'a, T>>> { + // Make sure the task is up to date + self.inner.waker.register_by_ref(cx.waker()); -fn poll_lock<'a, T>( - cx: &mut Context<'_>, - shared: &'a Arc<Shared<T>>, - inner: &Arc<WatchInner>, - ver: usize, -) -> Poll<Option<(Ref<'a, T>, usize)>> { - // Make sure the task is up to date - inner.waker.register_by_ref(cx.waker()); + let state = self.shared.version.load(SeqCst); + let version = state & !CLOSED; - let state = shared.version.load(SeqCst); - let version = state & !CLOSED; + if version != self.ver { + let inner = self.shared.value.read().unwrap(); + self.ver = version; - if version != ver { - let inner = shared.value.read().unwrap(); + return Ready(Some(Ref { inner })); + } - return Ready(Some((Ref { inner }, version))); - } + if CLOSED == state & CLOSED { + // The `Store` handle has been dropped. + return Ready(None); + } - if CLOSED == state & CLOSED { - // The `Store` handle has been dropped. - return Ready(None); + Pending } - - Pending } impl<T: Clone> Receiver<T> { /// Attempts to clone the latest value sent via the channel. - /// - /// This is equivalent to calling `clone()` on the value returned by - /// `recv_ref()`. - #[allow(clippy::map_clone)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3274 pub async fn recv(&mut self) -> Option<T> { - self.recv_ref().await.map(|v_ref| v_ref.clone()) + poll_fn(|cx| { + let v_ref = ready!(self.poll_recv_ref(cx)); + Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone())) + }) + .await } } +#[cfg(feature = "stream")] impl<T: Clone> futures_core::Stream for Receiver<T> { type Item = T; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { - use std::future::Future; + fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + let v_ref = ready!(self.poll_recv_ref(cx)); - let fut = self.get_mut().recv(); - pin_mut!(fut); - - let item = ready!(fut.poll(cx)); - Ready(item.map(|v_ref| v_ref)) + Poll::Ready(v_ref.map(|v_ref| (*v_ref).clone())) } } @@ -394,27 +363,6 @@ impl<T> Sender<T> { } } -impl<T> futures_sink::Sink<T> for Sender<T> { - type Error = error::SendError<T>; - - fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Ready(Ok(())) - } - - fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - self.as_ref().get_ref().broadcast(item)?; - Ok(()) - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - Ready(Ok(())) - } -} - /// Notify all watchers of a change fn notify_all<T>(shared: &Shared<T>) { let watchers = shared.watchers.lock().unwrap(); |