path: root/tokio/src/sync/
diff options
authorCarl Lerche <>2019-11-15 22:11:13 -0800
committerGitHub <>2019-11-15 22:11:13 -0800
commit8a7e57786a5dca139f5b4261685e22991ded0859 (patch)
treeb69d1c48f8a760a58fc7ccfe0376d9812a88d303 /tokio/src/sync/
parent930679587ae42e4df3113159ccf33fb5923dd73a (diff)
Limit `futures` dependency to `Stream` via feature flag (#1774)
In an effort to reach API stability, the `tokio` crate is shedding its _public_ dependencies on crates that are either a) do not provide a stable (1.0+) release with longevity guarantees or b) match the `tokio` release cadence. Of course, implementing `std` traits fits the requirements. The on exception, for now, is the `Stream` trait found in `futures_core`. It is expected that this trait will not change much and be moved into `std. Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain a dependency on this trait given how foundational it is. Since the `Stream` implementation is optional, types that are logically streams provide `async fn next_*` functions to obtain the next value. Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`. Additionally, some misc cleanup is also done: - `tokio::io::io` -> `tokio::io::util`. - `delay` -> `delay_until`. - `Timeout::new` -> `timeout(...)`. - `signal::ctrl_c()` returns a future instead of a stream. - `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait). - `time::Throttle` is removed (due to lack of `Stream` trait). - Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns).
Diffstat (limited to 'tokio/src/sync/')
1 files changed, 29 insertions, 81 deletions
diff --git a/tokio/src/sync/ b/tokio/src/sync/
index 928c2c46..d8e2cc35 100644
--- a/tokio/src/sync/
+++ b/tokio/src/sync/
@@ -51,20 +51,16 @@
//! [`Sender::closed`]: struct.Sender.html#method.closed
//! [`Receiver::get_ref`]: struct.Receiver.html#method.get_ref
+use crate::future::poll_fn;
use crate::sync::task::AtomicWaker;
-use core::task::Poll::{Pending, Ready};
-use core::task::{Context, Poll};
use fnv::FnvHashMap;
-use futures_util::future::poll_fn;
use std::ops;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak};
-use futures_core::ready;
-use futures_util::pin_mut;
-use std::pin::Pin;
+use std::task::Poll::{Pending, Ready};
+use std::task::{Context, Poll};
/// Receives values from the associated [`Sender`](struct.Sender.html).
@@ -235,77 +231,50 @@ impl<T> Receiver<T> {
Ref { inner }
- /// Attempts to receive the latest value sent via the channel.
- ///
- /// If a new, unobserved, value has been sent, a reference to it is
- /// returned. If no new value has been sent, then `Pending` is returned and
- /// the current task is notified once a new value is sent.
- ///
- /// Only the **most recent** value is returned. If the receiver is falling
- /// behind the sender, intermediate values are dropped.
- pub async fn recv_ref(&mut self) -> Option<Ref<'_, T>> {
- let shared = &self.shared;
- let inner = &self.inner;
- let version = self.ver;
- match poll_fn(|cx| poll_lock(cx, shared, inner, version)).await {
- Some((lock, version)) => {
- self.ver = version;
- Some(lock)
- }
- None => None,
- }
- }
+ // TODO: document
+ #[doc(hidden)]
+ pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Option<Ref<'a, T>>> {
+ // Make sure the task is up to date
+ self.inner.waker.register_by_ref(cx.waker());
-fn poll_lock<'a, T>(
- cx: &mut Context<'_>,
- shared: &'a Arc<Shared<T>>,
- inner: &Arc<WatchInner>,
- ver: usize,
-) -> Poll<Option<(Ref<'a, T>, usize)>> {
- // Make sure the task is up to date
- inner.waker.register_by_ref(cx.waker());
+ let state = self.shared.version.load(SeqCst);
+ let version = state & !CLOSED;
- let state = shared.version.load(SeqCst);
- let version = state & !CLOSED;
+ if version != self.ver {
+ let inner =;
+ self.ver = version;
- if version != ver {
- let inner =;
+ return Ready(Some(Ref { inner }));
+ }
- return Ready(Some((Ref { inner }, version)));
- }
+ if CLOSED == state & CLOSED {
+ // The `Store` handle has been dropped.
+ return Ready(None);
+ }
- if CLOSED == state & CLOSED {
- // The `Store` handle has been dropped.
- return Ready(None);
+ Pending
- Pending
impl<T: Clone> Receiver<T> {
/// Attempts to clone the latest value sent via the channel.
- ///
- /// This is equivalent to calling `clone()` on the value returned by
- /// `recv_ref()`.
- #[allow(clippy::map_clone)] // false positive:
pub async fn recv(&mut self) -> Option<T> {
- self.recv_ref()|v_ref| v_ref.clone())
+ poll_fn(|cx| {
+ let v_ref = ready!(self.poll_recv_ref(cx));
+ Poll::Ready(|v_ref| (*v_ref).clone()))
+ })
+ .await
+#[cfg(feature = "stream")]
impl<T: Clone> futures_core::Stream for Receiver<T> {
type Item = T;
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
- use std::future::Future;
+ fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ let v_ref = ready!(self.poll_recv_ref(cx));
- let fut = self.get_mut().recv();
- pin_mut!(fut);
- let item = ready!(fut.poll(cx));
- Ready(|v_ref| v_ref))
+ Poll::Ready(|v_ref| (*v_ref).clone()))
@@ -394,27 +363,6 @@ impl<T> Sender<T> {
-impl<T> futures_sink::Sink<T> for Sender<T> {
- type Error = error::SendError<T>;
- fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Ready(Ok(()))
- }
- fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
- self.as_ref().get_ref().broadcast(item)?;
- Ok(())
- }
- fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Ready(Ok(()))
- }
- fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- Ready(Ok(()))
- }
/// Notify all watchers of a change
fn notify_all<T>(shared: &Shared<T>) {
let watchers = shared.watchers.lock().unwrap();