summaryrefslogtreecommitdiffstats
path: root/tokio-sync/src/watch.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio-sync/src/watch.rs')
-rw-r--r--tokio-sync/src/watch.rs121
1 files changed, 77 insertions, 44 deletions
diff --git a/tokio-sync/src/watch.rs b/tokio-sync/src/watch.rs
index 6efe437d..0c2bb243 100644
--- a/tokio-sync/src/watch.rs
+++ b/tokio-sync/src/watch.rs
@@ -18,20 +18,22 @@
//! # Examples
//!
//! ```
-//! use tokio::prelude::*;
+//! #![feature(async_await)]
+//!
//! use tokio::sync::watch;
//!
-//! # tokio::run(futures::future::lazy(|| {
-//! let (mut tx, rx) = watch::channel("hello");
+//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+//! let (mut tx, mut rx) = watch::channel("hello");
//!
-//! tokio::spawn(rx.for_each(|value| {
-//! println!("received = {:?}", value);
-//! Ok(())
-//! }).map_err(|_| ()));
+//! tokio::spawn(async move {
+//! while let Some(value) = rx.recv().await {
+//! println!("received = {:?}", value);
+//! }
+//! });
//!
-//! tx.broadcast("world").unwrap();
+//! tx.broadcast("world")?;
//! # Ok(())
-//! # }));
+//! # }
//! ```
//!
//! # Closing
@@ -59,6 +61,8 @@ use core::task::Poll::{Pending, Ready};
use core::task::{Context, Poll};
use fnv::FnvHashMap;
use futures_core::ready;
+use futures_util::future::poll_fn;
+use futures_util::pin_mut;
use std::ops;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
@@ -165,20 +169,22 @@ const CLOSED: usize = 1;
/// # Examples
///
/// ```
-/// use tokio::prelude::*;
+/// #![feature(async_await)]
+///
/// use tokio::sync::watch;
///
-/// # tokio::run(futures::future::lazy(|| {
-/// let (mut tx, rx) = watch::channel("hello");
+/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+/// let (mut tx, mut rx) = watch::channel("hello");
///
-/// tokio::spawn(rx.for_each(|value| {
-/// println!("received = {:?}", value);
-/// Ok(())
-/// }).map_err(|_| ()));
+/// tokio::spawn(async move {
+/// while let Some(value) = rx.recv().await {
+/// println!("received = {:?}", value);
+/// }
+/// });
///
-/// tx.broadcast("world").unwrap();
+/// tx.broadcast("world")?;
/// # Ok(())
-/// # }));
+/// # }
/// ```
pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
const INIT_ID: u64 = 0;
@@ -241,39 +247,56 @@ impl<T> Receiver<T> {
///
/// Only the **most recent** value is returned. If the receiver is falling
/// behind the sender, intermediate values are dropped.
- pub fn poll_ref(&mut self, cx: &mut Context<'_>) -> Poll<Option<Ref<'_, T>>> {
- // Make sure the task is up to date
- self.inner.waker.register_by_ref(cx.waker());
-
- let state = self.shared.version.load(SeqCst);
- let version = state & !CLOSED;
+ #[allow(clippy::needless_lifetimes)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988
+ pub async fn recv_ref<'a>(&'a mut self) -> Option<Ref<'a, T>> {
+ let shared = &self.shared;
+ let inner = &self.inner;
+ let version = self.ver;
+
+ match poll_fn(|cx| poll_lock(cx, shared, inner, version)).await {
+ Some((lock, version)) => {
+ self.ver = version;
+ Some(lock)
+ }
+ None => None,
+ }
+ }
+}
- if version != self.ver {
- // Track the latest version
- self.ver = version;
+fn poll_lock<'a, T>(
+ cx: &mut Context<'_>,
+ shared: &'a Arc<Shared<T>>,
+ inner: &Arc<WatchInner>,
+ ver: usize,
+) -> Poll<Option<(Ref<'a, T>, usize)>> {
+ // Make sure the task is up to date
+ inner.waker.register_by_ref(cx.waker());
- let inner = self.shared.value.read().unwrap();
+ let state = shared.version.load(SeqCst);
+ let version = state & !CLOSED;
- return Ready(Some(Ref { inner }));
- }
+ if version != ver {
+ let inner = shared.value.read().unwrap();
- if CLOSED == state & CLOSED {
- // The `Store` handle has been dropped.
- return Ready(None);
- }
+ return Ready(Some((Ref { inner }, version)));
+ }
- Pending
+ if CLOSED == state & CLOSED {
+ // The `Store` handle has been dropped.
+ return Ready(None);
}
+
+ Pending
}
impl<T: Clone> Receiver<T> {
/// Attempts to clone the latest value sent via the channel.
///
- /// This is equivalent to calling `Clone` on the value returned by `poll_ref`.
- #[allow(clippy::map_clone)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3274
- pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
- let item = ready!(self.poll_ref(cx));
- Ready(item.map(|v_ref| v_ref.clone()))
+ /// This is equivalent to calling `clone()` on the value returned by
+ /// `recv()`.
+ #[allow(clippy::needless_lifetimes, clippy::map_clone)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988
+ pub async fn recv(&mut self) -> Option<T> {
+ self.recv_ref().await.map(|v_ref| v_ref.clone())
}
}
@@ -282,8 +305,13 @@ impl<T: Clone> futures_core::Stream for Receiver<T> {
type Item = T;
#[allow(clippy::map_clone)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3274
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
- let item = ready!(self.poll_ref(cx));
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ use std::future::Future;
+
+ let fut = self.get_mut().recv();
+ pin_mut!(fut);
+
+ let item = ready!(fut.poll(cx));
Ready(item.map(|v_ref| v_ref.clone()))
}
}
@@ -354,11 +382,16 @@ impl<T> Sender<T> {
Ok(())
}
- /// Returns `Ready` when all receivers have dropped.
+ /// Completes when all receivers have dropped.
///
/// This allows the producer to get notified when interest in the produced
/// values is canceled and immediately stop doing work.
- pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()> {
+ #[allow(clippy::needless_lifetimes)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988
+ pub async fn closed(&mut self) {
+ poll_fn(|cx| self.poll_close(cx)).await
+ }
+
+ fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()> {
match self.shared.upgrade() {
Some(shared) => {
shared.cancel.register_by_ref(cx.waker());