diff options
author | Carl Lerche <me@carllerche.com> | 2019-08-06 13:54:56 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-08-06 13:54:56 -0700 |
commit | 2f43b0a023f155a3efed4b048f5e0822072840f8 (patch) | |
tree | 56412a804ff510e7f0b8d5d00a20b12aabd5d84d /tokio-sync | |
parent | 05d00aebb7b8b467571a8cf58cb18ee4de8658c1 (diff) |
sync: polish and update API doc examples (#1398)
- Remove `poll_*` fns from some of the sync types.
- Move `AtomicWaker` and `Lock` to the root of the `sync` crate.
Diffstat (limited to 'tokio-sync')
-rw-r--r-- | tokio-sync/Cargo.toml | 6 | ||||
-rw-r--r-- | tokio-sync/src/lib.rs | 7 | ||||
-rw-r--r-- | tokio-sync/src/lock.rs | 72 | ||||
-rw-r--r-- | tokio-sync/src/mpsc/bounded.rs | 70 | ||||
-rw-r--r-- | tokio-sync/src/mpsc/unbounded.rs | 2 | ||||
-rw-r--r-- | tokio-sync/src/oneshot.rs | 44 | ||||
-rw-r--r-- | tokio-sync/src/watch.rs | 121 | ||||
-rw-r--r-- | tokio-sync/tests/atomic_waker.rs | 2 | ||||
-rw-r--r-- | tokio-sync/tests/fuzz_atomic_waker.rs | 2 | ||||
-rw-r--r-- | tokio-sync/tests/fuzz_mpsc.rs | 3 | ||||
-rw-r--r-- | tokio-sync/tests/fuzz_semaphore.rs | 2 | ||||
-rw-r--r-- | tokio-sync/tests/lock.rs | 45 | ||||
-rw-r--r-- | tokio-sync/tests/watch.rs | 291 |
13 files changed, 362 insertions, 305 deletions
diff --git a/tokio-sync/Cargo.toml b/tokio-sync/Cargo.toml index 64c7af83..7e593ca8 100644 --- a/tokio-sync/Cargo.toml +++ b/tokio-sync/Cargo.toml @@ -25,10 +25,10 @@ publish = false async-traits = ["futures-sink-preview"] [dependencies] -async-util = { git = "https://github.com/tokio-rs/async" } fnv = "1.0.6" -futures-core-preview = { version = "0.3.0-alpha.17" } -futures-sink-preview = { version = "0.3.0-alpha.17", optional = true } +futures-core-preview = { version = "= 0.3.0-alpha.17" } +futures-sink-preview = { version = "= 0.3.0-alpha.17", optional = true } +futures-util-preview = { version = "= 0.3.0-alpha.17" } [dev-dependencies] env_logger = { version = "0.5", default-features = false } diff --git a/tokio-sync/src/lib.rs b/tokio-sync/src/lib.rs index 00f9b700..9019abfa 100644 --- a/tokio-sync/src/lib.rs +++ b/tokio-sync/src/lib.rs @@ -27,10 +27,13 @@ macro_rules! if_fuzz { }} } -pub mod lock; +mod lock; mod loom; pub mod mpsc; pub mod oneshot; pub mod semaphore; -pub mod task; +mod task; pub mod watch; + +pub use lock::{Lock, LockGuard}; +pub use task::AtomicWaker; diff --git a/tokio-sync/src/lock.rs b/tokio-sync/src/lock.rs index 8b2bc0bc..addef25f 100644 --- a/tokio-sync/src/lock.rs +++ b/tokio-sync/src/lock.rs @@ -1,39 +1,29 @@ //! An asynchronous `Mutex`-like type. //! //! This module provides [`Lock`], a type that acts similarly to an asynchronous `Mutex`, with one -//! major difference: the [`LockGuard`] returned by `poll_lock` is not tied to the lifetime of the +//! major difference: the [`LockGuard`] returned by `lock` is not tied to the lifetime of the //! `Mutex`. This enables you to acquire a lock, and then pass that guard into a future, and then //! release it at some later point in time. //! //! This allows you to do something along the lines of: //! //! ```rust,no_run -//! use futures::{try_ready, future, Poll, Async, Future, Stream}; -//! use tokio::sync::lock::{Lock, LockGuard}; +//! #![feature(async_await)] //! -//! struct MyType<S> { -//! lock: Lock<S>, -//! } +//! use tokio::sync::Lock; +//! +//! #[tokio::main] +//! async fn main() { +//! let mut data1 = Lock::new(0); +//! let mut data2 = data1.clone(); //! -//! impl<S> Future for MyType<S> -//! where S: Stream<Item = u32> + Send + 'static -//! { -//! type Item = (); -//! type Error = (); +//! tokio::spawn(async move { +//! let mut lock = data2.lock().await; +//! *lock += 1; +//! }); //! -//! fn poll(&mut self) -> Poll<Self::Item, Self::Error> { -//! match self.lock.poll_lock() { -//! Async::Ready(mut guard) => { -//! tokio::spawn(future::poll_fn(move || { -//! let item = try_ready!(guard.poll().map_err(|_| ())); -//! println!("item = {:?}", item); -//! Ok(().into()) -//! })); -//! Ok(().into()) -//! }, -//! Async::NotReady => Ok(Async::NotReady) -//! } -//! } +//! let mut lock = data1.lock().await; +//! *lock += 1; //! } //! ``` //! @@ -43,11 +33,10 @@ use crate::semaphore; use futures_core::ready; +use futures_util::future::poll_fn; use std::cell::UnsafeCell; use std::fmt; -use std::future::Future; use std::ops::{Deref, DerefMut}; -use std::pin::Pin; use std::sync::Arc; use std::task::Poll::Ready; use std::task::{Context, Poll}; @@ -55,8 +44,8 @@ use std::task::{Context, Poll}; /// An asynchronous mutual exclusion primitive useful for protecting shared data /// /// Each mutex has a type parameter (`T`) which represents the data that it is protecting. The data -/// can only be accessed through the RAII guards returned from `poll_lock`, which guarantees that -/// the data is only ever accessed when the mutex is locked. +/// can only be accessed through the RAII guards returned from `lock`, which +/// guarantees that the data is only ever accessed when the mutex is locked. #[derive(Debug)] pub struct Lock<T> { inner: Arc<State<T>>, @@ -69,17 +58,11 @@ pub struct Lock<T> { /// internally keeps a reference-couned pointer to the original `Lock`, so even if the lock goes /// away, the guard remains valid. /// -/// The lock is automatically released whenever the guard is dropped, at which point `poll_lock` +/// The lock is automatically released whenever the guard is dropped, at which point `lock` /// will succeed yet again. #[derive(Debug)] pub struct LockGuard<T>(Lock<T>); -/// A future that resolves to a `LockGuard`. -#[derive(Debug)] -pub struct LockFuture<'a, T> { - lock: &'a mut Lock<T>, -} - // As long as T: Send, it's fine to send and share Lock<T> between threads. // If T was not Send, sending and sharing a Lock<T> would be bad, since you can access T through // Lock<T>. @@ -111,10 +94,7 @@ impl<T> Lock<T> { } } - /// Try to acquire the lock. - /// - /// If the lock is already held, the current task is notified when it is released. - pub fn poll_lock(&mut self, cx: &mut Context<'_>) -> Poll<LockGuard<T>> { + fn poll_lock(&mut self, cx: &mut Context<'_>) -> Poll<LockGuard<T>> { ready!(self.permit.poll_acquire(cx, &self.inner.s)).unwrap_or_else(|_| { // The semaphore was closed. but, we never explicitly close it, and we have a // handle to it through the Arc, which means that this can never happen. @@ -131,8 +111,9 @@ impl<T> Lock<T> { } /// A future that resolves on acquiring the lock and returns the `LockGuard`. - pub fn lock(&mut self) -> LockFuture<'_, T> { - LockFuture { lock: self } + #[allow(clippy::needless_lifetimes)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988 + pub async fn lock(&mut self) -> LockGuard<T> { + poll_fn(|cx| self.poll_lock(cx)).await } } @@ -193,12 +174,3 @@ impl<T: fmt::Display> fmt::Display for LockGuard<T> { fmt::Display::fmt(&**self, f) } } - -impl<T> Future for LockFuture<'_, T> { - type Output = LockGuard<T>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let me = &mut *self; - Pin::new(&mut *me.lock).poll_lock(cx) - } -} diff --git a/tokio-sync/src/mpsc/bounded.rs b/tokio-sync/src/mpsc/bounded.rs index 4e32a1ce..7fc19c28 100644 --- a/tokio-sync/src/mpsc/bounded.rs +++ b/tokio-sync/src/mpsc/bounded.rs @@ -82,34 +82,27 @@ pub struct RecvError(()); /// # Examples /// /// ```rust -/// use tokio::sync::mpsc::channel; -/// use tokio::prelude::*; -/// use futures::future::lazy; +/// #![feature(async_await)] /// -/// # fn some_computation() -> impl Future<Item = (), Error = ()> + Send { -/// # futures::future::ok::<(), ()>(()) -/// # } +/// use tokio::sync::mpsc; /// -/// tokio::run(lazy(|| { -/// let (tx, rx) = channel(100); +/// #[tokio::main] +/// async fn main() { +/// let (mut tx, mut rx) = mpsc::channel(100); /// -/// tokio::spawn({ -/// some_computation() -/// .and_then(|value| { -/// tx.send(value) -/// .map_err(|_| ()) -/// }) -/// .map(|_| ()) -/// .map_err(|_| ()) +/// tokio::spawn(async move { +/// for i in 0..10 { +/// if let Err(_) = tx.send(i).await { +/// println!("receiver dropped"); +/// return; +/// } +/// } /// }); /// -/// rx.for_each(|value| { -/// println!("got value = {:?}", value); -/// Ok(()) -/// }) -/// .map(|_| ()) -/// .map_err(|_| ()) -/// })); +/// while let Some(i) = rx.recv().await { +/// println!("got = {}", i); +/// } +/// } /// ``` pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { assert!(buffer > 0, "mpsc bounded channel requires buffer > 0"); @@ -134,7 +127,7 @@ impl<T> Receiver<T> { /// TODO: Dox #[allow(clippy::needless_lifetimes)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988 pub async fn recv(&mut self) -> Option<T> { - use async_util::future::poll_fn; + use futures_util::future::poll_fn; poll_fn(|cx| self.poll_recv(cx)).await } @@ -202,12 +195,35 @@ impl<T> Sender<T> { /// /// # Examples /// - /// ``` - /// unimplemented!(); + /// In the following example, each call to `send` will block until the + /// previously sent value was received. + /// + /// ```rust + /// #![feature(async_await)] + /// + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx, mut rx) = mpsc::channel(1); + /// + /// tokio::spawn(async move { + /// for i in 0..10 { + /// if let Err(_) = tx.send(i).await { + /// println!("receiver dropped"); + /// return; + /// } + /// } + /// }); + /// + /// while let Some(i) = rx.recv().await { + /// println!("got = {}", i); + /// } + /// } /// ``` #[allow(clippy::needless_lifetimes)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988 pub async fn send(&mut self, value: T) -> Result<(), SendError> { - use async_util::future::poll_fn; + use futures_util::future::poll_fn; poll_fn(|cx| self.poll_ready(cx)).await?; diff --git a/tokio-sync/src/mpsc/unbounded.rs b/tokio-sync/src/mpsc/unbounded.rs index 9de0ae12..b9fbe08e 100644 --- a/tokio-sync/src/mpsc/unbounded.rs +++ b/tokio-sync/src/mpsc/unbounded.rs @@ -95,7 +95,7 @@ impl<T> UnboundedReceiver<T> { /// TODO: Dox #[allow(clippy::needless_lifetimes)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988 pub async fn recv(&mut self) -> Option<T> { - use async_util::future::poll_fn; + use futures_util::future::poll_fn; poll_fn(|cx| self.poll_recv(cx)).await } diff --git a/tokio-sync/src/oneshot.rs b/tokio-sync/src/oneshot.rs index a51f8028..b53e6d4d 100644 --- a/tokio-sync/src/oneshot.rs +++ b/tokio-sync/src/oneshot.rs @@ -94,23 +94,25 @@ struct State(usize); /// # Examples /// /// ``` +/// #![feature(async_await)] +/// /// use tokio::sync::oneshot; -/// use futures::Future; -/// use std::thread; /// -/// let (sender, receiver) = oneshot::channel::<i32>(); +/// #[tokio::main] +/// async fn main() { +/// let (tx, rx) = oneshot::channel(); /// -/// # let t = -/// thread::spawn(|| { -/// let future = receiver.map(|i| { -/// println!("got: {:?}", i); +/// tokio::spawn(async move { +/// if let Err(_) = tx.send(3) { +/// println!("the receiver dropped"); +/// } /// }); -/// // ... -/// # return future; -/// }); /// -/// sender.send(3).unwrap(); -/// # t.join().unwrap().wait().unwrap(); +/// match rx.await { +/// Ok(v) => println!("got = {:?}", v), +/// Err(_) => println!("the sender dropped"), +/// } +/// } /// ``` pub fn channel<T>() -> (Sender<T>, Receiver<T>) { #[allow(deprecated)] @@ -218,11 +220,25 @@ impl<T> Sender<T> { /// # Examples /// /// ``` - /// unimplemented!(); + /// #![feature(async_await)] + /// + /// use tokio::sync::oneshot; + /// + /// #[tokio::main] + /// async fn main() { + /// let (mut tx, rx) = oneshot::channel::<()>(); + /// + /// tokio::spawn(async move { + /// drop(rx); + /// }); + /// + /// tx.closed().await; + /// println!("the receiver dropped"); + /// } /// ``` #[allow(clippy::needless_lifetimes)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988 pub async fn closed(&mut self) { - use async_util::future::poll_fn; + use futures_util::future::poll_fn; poll_fn(|cx| self.poll_closed(cx)).await } diff --git a/tokio-sync/src/watch.rs b/tokio-sync/src/watch.rs index 6efe437d..0c2bb243 100644 --- a/tokio-sync/src/watch.rs +++ b/tokio-sync/src/watch.rs @@ -18,20 +18,22 @@ //! # Examples //! //! ``` -//! use tokio::prelude::*; +//! #![feature(async_await)] +//! //! use tokio::sync::watch; //! -//! # tokio::run(futures::future::lazy(|| { -//! let (mut tx, rx) = watch::channel("hello"); +//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> { +//! let (mut tx, mut rx) = watch::channel("hello"); //! -//! tokio::spawn(rx.for_each(|value| { -//! println!("received = {:?}", value); -//! Ok(()) -//! }).map_err(|_| ())); +//! tokio::spawn(async move { +//! while let Some(value) = rx.recv().await { +//! println!("received = {:?}", value); +//! } +//! }); //! -//! tx.broadcast("world").unwrap(); +//! tx.broadcast("world")?; //! # Ok(()) -//! # })); +//! # } //! ``` //! //! # Closing @@ -59,6 +61,8 @@ use core::task::Poll::{Pending, Ready}; use core::task::{Context, Poll}; use fnv::FnvHashMap; use futures_core::ready; +use futures_util::future::poll_fn; +use futures_util::pin_mut; use std::ops; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; @@ -165,20 +169,22 @@ const CLOSED: usize = 1; /// # Examples /// /// ``` -/// use tokio::prelude::*; +/// #![feature(async_await)] +/// /// use tokio::sync::watch; /// -/// # tokio::run(futures::future::lazy(|| { -/// let (mut tx, rx) = watch::channel("hello"); +/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { +/// let (mut tx, mut rx) = watch::channel("hello"); /// -/// tokio::spawn(rx.for_each(|value| { -/// println!("received = {:?}", value); -/// Ok(()) -/// }).map_err(|_| ())); +/// tokio::spawn(async move { +/// while let Some(value) = rx.recv().await { +/// println!("received = {:?}", value); +/// } +/// }); /// -/// tx.broadcast("world").unwrap(); +/// tx.broadcast("world")?; /// # Ok(()) -/// # })); +/// # } /// ``` pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) { const INIT_ID: u64 = 0; @@ -241,39 +247,56 @@ impl<T> Receiver<T> { /// /// Only the **most recent** value is returned. If the receiver is falling /// behind the sender, intermediate values are dropped. - pub fn poll_ref(&mut self, cx: &mut Context<'_>) -> Poll<Option<Ref<'_, T>>> { - // Make sure the task is up to date - self.inner.waker.register_by_ref(cx.waker()); - - let state = self.shared.version.load(SeqCst); - let version = state & !CLOSED; + #[allow(clippy::needless_lifetimes)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988 + pub async fn recv_ref<'a>(&'a mut self) -> Option<Ref<'a, T>> { + let shared = &self.shared; + let inner = &self.inner; + let version = self.ver; + + match poll_fn(|cx| poll_lock(cx, shared, inner, version)).await { + Some((lock, version)) => { + self.ver = version; + Some(lock) + } + None => None, + } + } +} - if version != self.ver { - // Track the latest version - self.ver = version; +fn poll_lock<'a, T>( + cx: &mut Context<'_>, + shared: &'a Arc<Shared<T>>, + inner: &Arc<WatchInner>, + ver: usize, +) -> Poll<Option<(Ref<'a, T>, usize)>> { + // Make sure the task is up to date + inner.waker.register_by_ref(cx.waker()); - let inner = self.shared.value.read().unwrap(); + let state = shared.version.load(SeqCst); + let version = state & !CLOSED; - return Ready(Some(Ref { inner })); - } + if version != ver { + let inner = shared.value.read().unwrap(); - if CLOSED == state & CLOSED { - // The `Store` handle has been dropped. - return Ready(None); - } + return Ready(Some((Ref { inner }, version))); + } - Pending + if CLOSED == state & CLOSED { + // The `Store` handle has been dropped. + return Ready(None); } + + Pending } impl<T: Clone> Receiver<T> { /// Attempts to clone the latest value sent via the channel. /// - /// This is equivalent to calling `Clone` on the value returned by `poll_ref`. - #[allow(clippy::map_clone)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3274 - pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { - let item = ready!(self.poll_ref(cx)); - Ready(item.map(|v_ref| v_ref.clone())) + /// This is equivalent to calling `clone()` on the value returned by + /// `recv()`. + #[allow(clippy::needless_lifetimes, clippy::map_clone)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988 + pub async fn recv(&mut self) -> Option<T> { + self.recv_ref().await.map(|v_ref| v_ref.clone()) } } @@ -282,8 +305,13 @@ impl<T: Clone> futures_core::Stream for Receiver<T> { type Item = T; #[allow(clippy::map_clone)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3274 - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { - let item = ready!(self.poll_ref(cx)); + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> { + use std::future::Future; + + let fut = self.get_mut().recv(); + pin_mut!(fut); + + let item = ready!(fut.poll(cx)); Ready(item.map(|v_ref| v_ref.clone())) } } @@ -354,11 +382,16 @@ impl<T> Sender<T> { Ok(()) } - /// Returns `Ready` when all receivers have dropped. + /// Completes when all receivers have dropped. /// /// This allows the producer to get notified when interest in the produced /// values is canceled and immediately stop doing work. - pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()> { + #[allow(clippy::needless_lifetimes)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988 + pub async fn closed(&mut self) { + poll_fn(|cx| self.poll_close(cx)).await + } + + fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()> { match self.shared.upgrade() { Some(shared) => { shared.cancel.register_by_ref(cx.waker()); diff --git a/tokio-sync/tests/atomic_waker.rs b/tokio-sync/tests/atomic_waker.rs index 28a9f40d..ffd3cc26 100644 --- a/tokio-sync/tests/atomic_waker.rs +++ b/tokio-sync/tests/atomic_waker.rs @@ -1,7 +1,7 @@ #![deny(warnings, rust_2018_idioms)] use std::task::Waker; -use tokio_sync::task::AtomicWaker; +use tokio_sync::AtomicWaker; use tokio_test::task::MockTask; trait AssertSend: Send {} diff --git a/tokio-sync/tests/fuzz_atomic_waker.rs b/tokio-sync/tests/fuzz_atomic_waker.rs index 3aebe280..526f1070 100644 --- a/tokio-sync/tests/fuzz_atomic_waker.rs +++ b/tokio-sync/tests/fuzz_atomic_waker.rs @@ -8,7 +8,7 @@ extern crate loom; mod atomic_waker; use crate::atomic_waker::AtomicWaker; -use async_util::future::poll_fn; +use futures_util::future::poll_fn; use loom::futures::block_on; use loom::sync::atomic::AtomicUsize; use loom::thread; diff --git a/tokio-sync/tests/fuzz_mpsc.rs b/tokio-sync/tests/fuzz_mpsc.rs index 0951d88c..0218c3af 100644 --- a/tokio-sync/tests/fuzz_mpsc.rs +++ b/tokio-sync/tests/fuzz_mpsc.rs @@ -18,8 +18,7 @@ mod mpsc; #[allow(warnings)] mod semaphore; -// use futures::{future::poll_fn, Stream}; -use async_util::future::poll_fn; +use futures_util::future::poll_fn; use loom::futures::block_on; use loom::thread; diff --git a/tokio-sync/tests/fuzz_semaphore.rs b/tokio-sync/tests/fuzz_semaphore.rs index f54b99a7..ebc05db2 100644 --- a/tokio-sync/tests/fuzz_semaphore.rs +++ b/tokio-sync/tests/fuzz_semaphore.rs @@ -9,8 +9,8 @@ mod semaphore; use crate::semaphore::*; -use async_util::future::poll_fn; use futures_core::ready; +use futures_util::future::poll_fn; use loom::futures::block_on; use loom::thread; use std::future::Future; diff --git a/tokio-sync/tests/lock.rs b/tokio-sync/tests/lock.rs index 69661531..af318a15 100644 --- a/tokio-sync/tests/lock.rs +++ b/tokio-sync/tests/lock.rs @@ -1,55 +1,57 @@ #![deny(warnings, rust_2018_idioms)] -use pin_utils::pin_mut; -use tokio_sync::lock::Lock; -use tokio_test::task::MockTask; +use tokio_sync::Lock; +use tokio_test::task::spawn; use tokio_test::{assert_pending, assert_ready}; #[test] fn straight_execution() { - let mut task = MockTask::new(); let mut l = Lock::new(100); - // We can immediately acquire the lock and take the value - task.enter(|cx| { - let mut g = assert_ready!(l.poll_lock(cx)); + { + let mut t = spawn(l.lock()); + let mut g = assert_ready!(t.poll()); assert_eq!(&*g, &100); *g = 99; - drop(g); - - let mut g = assert_ready!(l.poll_lock(cx)); + } + { + let mut t = spawn(l.lock()); + let mut g = assert_ready!(t.poll()); assert_eq!(&*g, &99); *g = 98; - drop(g); - - let mut g = assert_ready!(l.poll_lock(cx)); + } + { + let mut t = spawn(l.lock()); + let mut g = assert_ready!(t.poll()); assert_eq!(&*g, &98); // We can continue to access the guard even if the lock is dropped + drop(t); drop(l); *g = 97; assert_eq!(&*g, &97); - }); + } } #[test] fn readiness() { - let mut t1 = MockTask::new(); - let mut t2 = MockTask::new(); - - let mut l = Lock::new(100); + let mut l1 = Lock::new(100); + let mut l2 = l1.clone(); + let mut t1 = spawn(l1.lock()); + let mut t2 = spawn(l2.lock()); - let g = assert_ready!(t1.enter(|cx| l.poll_lock(cx))); + let g = assert_ready!(t1.poll()); // We can't now acquire the lease since it's already held in g - assert_pending!(t2.enter(|cx| l.poll_lock(cx))); + assert_pending!(t2.poll()); // But once g unlocks, we can acquire it drop(g); assert!(t2.is_woken()); - assert_ready!(t2.enter(|cx| l.poll_lock(cx))); + assert_ready!(t2.poll()); } +/* #[test] #[ignore] fn lock() { @@ -79,3 +81,4 @@ fn lock() { let result = assert_ready!(task.poll(&mut l)); assert!(*result); } +*/ diff --git a/tokio-sync/tests/watch.rs b/tokio-sync/tests/watch.rs index 10a6a822..3be30938 100644 --- a/tokio-sync/tests/watch.rs +++ b/tokio-sync/tests/watch.rs @@ -1,7 +1,7 @@ #![deny(warnings, rust_2018_idioms)] use tokio_sync::watch; -use tokio_test::task::MockTask; +use tokio_test::task::spawn; use tokio_test::{assert_pending, assert_ready}; /* @@ -27,110 +27,111 @@ macro_rules! assert_not_ready { */ #[test] -fn single_rx_poll_ref() { +fn single_rx_recv_ref() { let (tx, mut rx) = watch::channel("one"); - let mut task = MockTask::new(); - task.enter(|cx| { - { - let v = assert_ready!(rx.poll_ref(cx)).unwrap(); - assert_eq!(*v, "one"); - } - assert_pending!(rx.poll_ref(cx)); - }); + { + let mut t = spawn(rx.recv_ref()); + let v = assert_ready!(t.poll()).unwrap(); + assert_eq!(*v, "one"); + } - tx.broadcast("two").unwrap(); + { + let mut t = spawn(rx.recv_ref()); - assert!(task.is_woken()); + assert_pending!(t.poll()); - task.enter(|cx| { - { - let v = assert_ready!(rx.poll_ref(cx)).unwrap(); - assert_eq!(*v, "two"); - } - assert_pending!(rx.poll_ref(cx)); - }); + tx.broadcast("two").unwrap(); - drop(tx); + assert!(t.is_woken()); + + let v = assert_ready!(t.poll()).unwrap(); + assert_eq!(*v, "two"); + } - assert!(task.is_woken()); + { + let mut t = spawn(rx.recv_ref()); - task.enter(|cx| { - let res = assert_ready!(rx.poll_ref(cx)); + assert_pending!(t.poll()); + + drop(tx); + + let res = assert_ready!(t.poll()); assert!(res.is_none()); - }); + } } #[test] -fn single_rx_poll_next() { +fn single_rx_recv() { let (tx, mut rx) = watch::channel("one"); - let mut task = MockTask::new(); - task.enter(|cx| { - let v = assert_ready!(rx.poll_next(cx)).unwrap(); + { + let mut t = spawn(rx.recv()); + let v = assert_ready!(t.poll()).unwrap(); assert_eq!(v, "one"); - assert_pending!(rx.poll_ref(cx)); - }); + } - tx.broadcast("two").unwrap(); + { + let mut t = spawn(rx.recv()); + + assert_pending!(t.poll()); + + tx.broadcast("two").unwrap(); - assert!(task.is_woken()); + assert!(t.is_woken()); - task.enter(|cx| { - let v = assert_ready!(rx.poll_next(cx)).unwrap(); + let v = assert_ready!(t.poll()).unwrap(); assert_eq!(v, "two"); - assert_pending!(rx.poll_ref(cx)); - }); + } - drop(tx); + { + let mut t = spawn(rx.recv()); + + assert_pending!(t.poll()); - assert!(task.is_woken()); + drop(tx); - task.enter(|cx| { - let res = assert_ready!(rx.poll_next(cx)); + let res = assert_ready!(t.poll()); assert!(res.is_none()); - }); + } } #[test] #[cfg(feature = "async-traits")] fn stream_impl() { - use futures_core::Stream; - use pin_utils::pin_mut; + use tokio::prelude::*; - let (tx, rx) = watch::channel("one"); - let mut task = MockTask::new(); + let (tx, mut rx) = watch::channel("one"); - pin_mut!(rx); + { + let mut t = spawn(rx.next()); + let v = assert_ready!(t.poll()).unwrap(); + assert_eq!(v, "one"); + } - task.enter(|cx| { - { - let v = assert_ready!(Stream::poll_next(rx.as_mut(), cx)).unwrap(); - assert_eq!(v, "one"); - } - assert_pending!(rx.poll_ref(cx)); - }); + { + let mut t = spawn(rx.next()); - tx.broadcast("two").unwrap(); + assert_pending!(t.poll()); - assert!(task.is_woken()); + tx.broadcast("two").unwrap(); - task.enter(|cx| { - { - let v = assert_ready!(Stream::poll_next(rx.as_mut(), cx)).unwrap(); - assert_eq!(v, "two"); - } - assert_pending!(rx.poll_ref(cx)); - }); + assert!(t.is_woken()); - drop(tx); + let v = assert_ready!(t.poll()).unwrap(); + assert_eq!(v, "two"); + } - assert!(task.is_woken()); + { + let mut t = spawn(rx.next()); - task.enter(|cx| { - let res = assert_ready!(Stream::poll_next(rx, cx)); + assert_pending!(t.poll()); + + drop(tx); + + let res = assert_ready!(t.poll()); assert!(res.is_none()); - }); + } } #[test] @@ -138,67 +139,83 @@ fn multi_rx() { let (tx, mut rx1) = watch::channel("one"); let mut rx2 = rx1.clone(); - let mut task1 = MockTask::new(); - let mut task2 = MockTask::new(); + { + let mut t1 = spawn(rx1.recv_ref()); + let mut t2 = spawn(rx2.recv_ref()); - task1.enter(|cx| { - let res = assert_ready!(rx1.poll_ref(cx)); + let res = assert_ready!(t1.poll()); assert_eq!(*res.unwrap(), "one"); - }); - task2.enter(|cx| { - let res = assert_ready!(rx2.poll_ref(cx)); + let res = assert_ready!(t2.poll()); assert_eq!(*res.unwrap(), "one"); - }); + } - tx.broadcast("two").unwrap(); + let mut t2 = spawn(rx2.recv_ref()); + + { + let mut t1 = spawn(rx1.recv_ref()); + + assert_pending!(t1.poll()); + assert_pending!(t2.poll()); - assert!(task1.is_woken()); - assert!(task2.is_woken()); + tx.broadcast("two").unwrap(); - task1.enter(|cx| { - let res = assert_ready!(rx1.poll_ref(cx)); + assert!(t1.is_woken()); + assert!(t2.is_woken()); + + let res = assert_ready!(t1.poll()); assert_eq!(*res.unwrap(), "two"); - }); + } + + { + let mut t1 = spawn(rx1.recv_ref()); - tx.broadcast("three").unwrap(); + assert_pending!(t1.poll()); - assert!(task1.is_woken()); - assert!(task2.is_woken()); + tx.broadcast("three").unwrap(); - task1.enter(|cx| { - let res = assert_ready!(rx1.poll_ref(cx)); + assert!(t1.is_woken()); + assert!(t2.is_woken()); + + let res = assert_ready!(t1.poll()); assert_eq!(*res.unwrap(), "three"); - }); - task2.enter(|cx| { - let res = assert_ready!(rx2.poll_ref(cx)); + let res = assert_ready!(t2.poll()); assert_eq!(*res.unwrap(), "three"); - }); + } + + drop(t2); - tx.broadcast("four").unwrap(); + { + let mut t1 = spawn(rx1.recv_ref()); + let mut t2 = spawn(rx2.recv_ref()); |