diff options
author | Carl Lerche <me@carllerche.com> | 2019-08-06 13:54:56 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-08-06 13:54:56 -0700 |
commit | 2f43b0a023f155a3efed4b048f5e0822072840f8 (patch) | |
tree | 56412a804ff510e7f0b8d5d00a20b12aabd5d84d /tokio-sync/src/watch.rs | |
parent | 05d00aebb7b8b467571a8cf58cb18ee4de8658c1 (diff) |
sync: polish and update API doc examples (#1398)
- Remove `poll_*` fns from some of the sync types.
- Move `AtomicWaker` and `Lock` to the root of the `sync` crate.
Diffstat (limited to 'tokio-sync/src/watch.rs')
-rw-r--r-- | tokio-sync/src/watch.rs | 121 |
1 files changed, 77 insertions, 44 deletions
diff --git a/tokio-sync/src/watch.rs b/tokio-sync/src/watch.rs index 6efe437d..0c2bb243 100644 --- a/tokio-sync/src/watch.rs +++ b/tokio-sync/src/watch.rs @@ -18,20 +18,22 @@ //! # Examples //! //! ``` -//! use tokio::prelude::*; +//! #![feature(async_await)] +//! //! use tokio::sync::watch; //! -//! # tokio::run(futures::future::lazy(|| { -//! let (mut tx, rx) = watch::channel("hello"); +//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> { +//! let (mut tx, mut rx) = watch::channel("hello"); //! -//! tokio::spawn(rx.for_each(|value| { -//! println!("received = {:?}", value); -//! Ok(()) -//! }).map_err(|_| ())); +//! tokio::spawn(async move { +//! while let Some(value) = rx.recv().await { +//! println!("received = {:?}", value); +//! } +//! }); //! -//! tx.broadcast("world").unwrap(); +//! tx.broadcast("world")?; //! # Ok(()) -//! # })); +//! # } //! ``` //! //! # Closing @@ -59,6 +61,8 @@ use core::task::Poll::{Pending, Ready}; use core::task::{Context, Poll}; use fnv::FnvHashMap; use futures_core::ready; +use futures_util::future::poll_fn; +use futures_util::pin_mut; use std::ops; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; @@ -165,20 +169,22 @@ const CLOSED: usize = 1; /// # Examples /// /// ``` -/// use tokio::prelude::*; +/// #![feature(async_await)] +/// /// use tokio::sync::watch; /// -/// # tokio::run(futures::future::lazy(|| { -/// let (mut tx, rx) = watch::channel("hello"); +/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { +/// let (mut tx, mut rx) = watch::channel("hello"); /// -/// tokio::spawn(rx.for_each(|value| { -/// println!("received = {:?}", value); -/// Ok(()) -/// }).map_err(|_| ())); +/// tokio::spawn(async move { +/// while let Some(value) = rx.recv().await { +/// println!("received = {:?}", value); +/// } +/// }); /// -/// tx.broadcast("world").unwrap(); +/// tx.broadcast("world")?; /// # Ok(()) -/// # })); +/// # } /// ``` pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) { const INIT_ID: u64 = 0; @@ -241,39 +247,56 @@ impl<T> Receiver<T> { /// /// Only the **most recent** value is returned. If the receiver is falling /// behind the sender, intermediate values are dropped. - pub fn poll_ref(&mut self, cx: &mut Context<'_>) -> Poll<Option<Ref<'_, T>>> { - // Make sure the task is up to date - self.inner.waker.register_by_ref(cx.waker()); - - let state = self.shared.version.load(SeqCst); - let version = state & !CLOSED; + #[allow(clippy::needless_lifetimes)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988 + pub async fn recv_ref<'a>(&'a mut self) -> Option<Ref<'a, T>> { + let shared = &self.shared; + let inner = &self.inner; + let version = self.ver; + + match poll_fn(|cx| poll_lock(cx, shared, inner, version)).await { + Some((lock, version)) => { + self.ver = version; + Some(lock) + } + None => None, + } + } +} - if version != self.ver { - // Track the latest version - self.ver = version; +fn poll_lock<'a, T>( + cx: &mut Context<'_>, + shared: &'a Arc<Shared<T>>, + inner: &Arc<WatchInner>, + ver: usize, +) -> Poll<Option<(Ref<'a, T>, usize)>> { + // Make sure the task is up to date + inner.waker.register_by_ref(cx.waker()); - let inner = self.shared.value.read().unwrap(); + let state = shared.version.load(SeqCst); + let version = state & !CLOSED; - return Ready(Some(Ref { inner })); - } + if version != ver { + let inner = shared.value.read().unwrap(); - if CLOSED == state & CLOSED { - // The `Store` handle has been dropped. - return Ready(None); - } + return Ready(Some((Ref { inner }, version))); + } - Pending + if CLOSED == state & CLOSED { + // The `Store` handle has been dropped. + return Ready(None); } + + Pending } impl<T: Clone> Receiver<T> { /// Attempts to clone the latest value sent via the channel. /// - /// This is equivalent to calling `Clone` on the value returned by `poll_ref`. - #[allow(clippy::map_clone)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3274 - pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { - let item = ready!(self.poll_ref(cx)); - Ready(item.map(|v_ref| v_ref.clone())) + /// This is equivalent to calling `clone()` on the value returned by + /// `recv()`. + #[allow(clippy::needless_lifetimes, clippy::map_clone)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988 + pub async fn recv(&mut self) -> Option<T> { + self.recv_ref().await.map(|v_ref| v_ref.clone()) } } @@ -282,8 +305,13 @@ impl<T: Clone> futures_core::Stream for Receiver<T> { type Item = T; #[allow(clippy::map_clone)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3274 - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { - let item = ready!(self.poll_ref(cx)); + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + use std::future::Future; + + let fut = self.get_mut().recv(); + pin_mut!(fut); + + let item = ready!(fut.poll(cx)); Ready(item.map(|v_ref| v_ref.clone())) } } @@ -354,11 +382,16 @@ impl<T> Sender<T> { Ok(()) } - /// Returns `Ready` when all receivers have dropped. + /// Completes when all receivers have dropped. /// /// This allows the producer to get notified when interest in the produced /// values is canceled and immediately stop doing work. - pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()> { + #[allow(clippy::needless_lifetimes)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988 + pub async fn closed(&mut self) { + poll_fn(|cx| self.poll_close(cx)).await + } + + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()> { match self.shared.upgrade() { Some(shared) => { shared.cancel.register_by_ref(cx.waker()); |