summaryrefslogtreecommitdiffstats
path: root/tokio-sync
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-08-06 13:54:56 -0700
committerGitHub <noreply@github.com>2019-08-06 13:54:56 -0700
commit2f43b0a023f155a3efed4b048f5e0822072840f8 (patch)
tree56412a804ff510e7f0b8d5d00a20b12aabd5d84d /tokio-sync
parent05d00aebb7b8b467571a8cf58cb18ee4de8658c1 (diff)
sync: polish and update API doc examples (#1398)
- Remove `poll_*` fns from some of the sync types. - Move `AtomicWaker` and `Lock` to the root of the `sync` crate.
Diffstat (limited to 'tokio-sync')
-rw-r--r--tokio-sync/Cargo.toml6
-rw-r--r--tokio-sync/src/lib.rs7
-rw-r--r--tokio-sync/src/lock.rs72
-rw-r--r--tokio-sync/src/mpsc/bounded.rs70
-rw-r--r--tokio-sync/src/mpsc/unbounded.rs2
-rw-r--r--tokio-sync/src/oneshot.rs44
-rw-r--r--tokio-sync/src/watch.rs121
-rw-r--r--tokio-sync/tests/atomic_waker.rs2
-rw-r--r--tokio-sync/tests/fuzz_atomic_waker.rs2
-rw-r--r--tokio-sync/tests/fuzz_mpsc.rs3
-rw-r--r--tokio-sync/tests/fuzz_semaphore.rs2
-rw-r--r--tokio-sync/tests/lock.rs45
-rw-r--r--tokio-sync/tests/watch.rs291
13 files changed, 362 insertions, 305 deletions
diff --git a/tokio-sync/Cargo.toml b/tokio-sync/Cargo.toml
index 64c7af83..7e593ca8 100644
--- a/tokio-sync/Cargo.toml
+++ b/tokio-sync/Cargo.toml
@@ -25,10 +25,10 @@ publish = false
async-traits = ["futures-sink-preview"]
[dependencies]
-async-util = { git = "https://github.com/tokio-rs/async" }
fnv = "1.0.6"
-futures-core-preview = { version = "0.3.0-alpha.17" }
-futures-sink-preview = { version = "0.3.0-alpha.17", optional = true }
+futures-core-preview = { version = "= 0.3.0-alpha.17" }
+futures-sink-preview = { version = "= 0.3.0-alpha.17", optional = true }
+futures-util-preview = { version = "= 0.3.0-alpha.17" }
[dev-dependencies]
env_logger = { version = "0.5", default-features = false }
diff --git a/tokio-sync/src/lib.rs b/tokio-sync/src/lib.rs
index 00f9b700..9019abfa 100644
--- a/tokio-sync/src/lib.rs
+++ b/tokio-sync/src/lib.rs
@@ -27,10 +27,13 @@ macro_rules! if_fuzz {
}}
}
-pub mod lock;
+mod lock;
mod loom;
pub mod mpsc;
pub mod oneshot;
pub mod semaphore;
-pub mod task;
+mod task;
pub mod watch;
+
+pub use lock::{Lock, LockGuard};
+pub use task::AtomicWaker;
diff --git a/tokio-sync/src/lock.rs b/tokio-sync/src/lock.rs
index 8b2bc0bc..addef25f 100644
--- a/tokio-sync/src/lock.rs
+++ b/tokio-sync/src/lock.rs
@@ -1,39 +1,29 @@
//! An asynchronous `Mutex`-like type.
//!
//! This module provides [`Lock`], a type that acts similarly to an asynchronous `Mutex`, with one
-//! major difference: the [`LockGuard`] returned by `poll_lock` is not tied to the lifetime of the
+//! major difference: the [`LockGuard`] returned by `lock` is not tied to the lifetime of the
//! `Mutex`. This enables you to acquire a lock, and then pass that guard into a future, and then
//! release it at some later point in time.
//!
//! This allows you to do something along the lines of:
//!
//! ```rust,no_run
-//! use futures::{try_ready, future, Poll, Async, Future, Stream};
-//! use tokio::sync::lock::{Lock, LockGuard};
+//! #![feature(async_await)]
//!
-//! struct MyType<S> {
-//! lock: Lock<S>,
-//! }
+//! use tokio::sync::Lock;
+//!
+//! #[tokio::main]
+//! async fn main() {
+//! let mut data1 = Lock::new(0);
+//! let mut data2 = data1.clone();
//!
-//! impl<S> Future for MyType<S>
-//! where S: Stream<Item = u32> + Send + 'static
-//! {
-//! type Item = ();
-//! type Error = ();
+//! tokio::spawn(async move {
+//! let mut lock = data2.lock().await;
+//! *lock += 1;
+//! });
//!
-//! fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
-//! match self.lock.poll_lock() {
-//! Async::Ready(mut guard) => {
-//! tokio::spawn(future::poll_fn(move || {
-//! let item = try_ready!(guard.poll().map_err(|_| ()));
-//! println!("item = {:?}", item);
-//! Ok(().into())
-//! }));
-//! Ok(().into())
-//! },
-//! Async::NotReady => Ok(Async::NotReady)
-//! }
-//! }
+//! let mut lock = data1.lock().await;
+//! *lock += 1;
//! }
//! ```
//!
@@ -43,11 +33,10 @@
use crate::semaphore;
use futures_core::ready;
+use futures_util::future::poll_fn;
use std::cell::UnsafeCell;
use std::fmt;
-use std::future::Future;
use std::ops::{Deref, DerefMut};
-use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll::Ready;
use std::task::{Context, Poll};
@@ -55,8 +44,8 @@ use std::task::{Context, Poll};
/// An asynchronous mutual exclusion primitive useful for protecting shared data
///
/// Each mutex has a type parameter (`T`) which represents the data that it is protecting. The data
-/// can only be accessed through the RAII guards returned from `poll_lock`, which guarantees that
-/// the data is only ever accessed when the mutex is locked.
+/// can only be accessed through the RAII guards returned from `lock`, which
+/// guarantees that the data is only ever accessed when the mutex is locked.
#[derive(Debug)]
pub struct Lock<T> {
inner: Arc<State<T>>,
@@ -69,17 +58,11 @@ pub struct Lock<T> {
/// internally keeps a reference-couned pointer to the original `Lock`, so even if the lock goes
/// away, the guard remains valid.
///
-/// The lock is automatically released whenever the guard is dropped, at which point `poll_lock`
+/// The lock is automatically released whenever the guard is dropped, at which point `lock`
/// will succeed yet again.
#[derive(Debug)]
pub struct LockGuard<T>(Lock<T>);
-/// A future that resolves to a `LockGuard`.
-#[derive(Debug)]
-pub struct LockFuture<'a, T> {
- lock: &'a mut Lock<T>,
-}
-
// As long as T: Send, it's fine to send and share Lock<T> between threads.
// If T was not Send, sending and sharing a Lock<T> would be bad, since you can access T through
// Lock<T>.
@@ -111,10 +94,7 @@ impl<T> Lock<T> {
}
}
- /// Try to acquire the lock.
- ///
- /// If the lock is already held, the current task is notified when it is released.
- pub fn poll_lock(&mut self, cx: &mut Context<'_>) -> Poll<LockGuard<T>> {
+ fn poll_lock(&mut self, cx: &mut Context<'_>) -> Poll<LockGuard<T>> {
ready!(self.permit.poll_acquire(cx, &self.inner.s)).unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and we have a
// handle to it through the Arc, which means that this can never happen.
@@ -131,8 +111,9 @@ impl<T> Lock<T> {
}
/// A future that resolves on acquiring the lock and returns the `LockGuard`.
- pub fn lock(&mut self) -> LockFuture<'_, T> {
- LockFuture { lock: self }
+ #[allow(clippy::needless_lifetimes)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988
+ pub async fn lock(&mut self) -> LockGuard<T> {
+ poll_fn(|cx| self.poll_lock(cx)).await
}
}
@@ -193,12 +174,3 @@ impl<T: fmt::Display> fmt::Display for LockGuard<T> {
fmt::Display::fmt(&**self, f)
}
}
-
-impl<T> Future for LockFuture<'_, T> {
- type Output = LockGuard<T>;
-
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let me = &mut *self;
- Pin::new(&mut *me.lock).poll_lock(cx)
- }
-}
diff --git a/tokio-sync/src/mpsc/bounded.rs b/tokio-sync/src/mpsc/bounded.rs
index 4e32a1ce..7fc19c28 100644
--- a/tokio-sync/src/mpsc/bounded.rs
+++ b/tokio-sync/src/mpsc/bounded.rs
@@ -82,34 +82,27 @@ pub struct RecvError(());
/// # Examples
///
/// ```rust
-/// use tokio::sync::mpsc::channel;
-/// use tokio::prelude::*;
-/// use futures::future::lazy;
+/// #![feature(async_await)]
///
-/// # fn some_computation() -> impl Future<Item = (), Error = ()> + Send {
-/// # futures::future::ok::<(), ()>(())
-/// # }
+/// use tokio::sync::mpsc;
///
-/// tokio::run(lazy(|| {
-/// let (tx, rx) = channel(100);
+/// #[tokio::main]
+/// async fn main() {
+/// let (mut tx, mut rx) = mpsc::channel(100);
///
-/// tokio::spawn({
-/// some_computation()
-/// .and_then(|value| {
-/// tx.send(value)
-/// .map_err(|_| ())
-/// })
-/// .map(|_| ())
-/// .map_err(|_| ())
+/// tokio::spawn(async move {
+/// for i in 0..10 {
+/// if let Err(_) = tx.send(i).await {
+/// println!("receiver dropped");
+/// return;
+/// }
+/// }
/// });
///
-/// rx.for_each(|value| {
-/// println!("got value = {:?}", value);
-/// Ok(())
-/// })
-/// .map(|_| ())
-/// .map_err(|_| ())
-/// }));
+/// while let Some(i) = rx.recv().await {
+/// println!("got = {}", i);
+/// }
+/// }
/// ```
pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
@@ -134,7 +127,7 @@ impl<T> Receiver<T> {
/// TODO: Dox
#[allow(clippy::needless_lifetimes)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988
pub async fn recv(&mut self) -> Option<T> {
- use async_util::future::poll_fn;
+ use futures_util::future::poll_fn;
poll_fn(|cx| self.poll_recv(cx)).await
}
@@ -202,12 +195,35 @@ impl<T> Sender<T> {
///
/// # Examples
///
- /// ```
- /// unimplemented!();
+ /// In the following example, each call to `send` will block until the
+ /// previously sent value was received.
+ ///
+ /// ```rust
+ /// #![feature(async_await)]
+ ///
+ /// use tokio::sync::mpsc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx, mut rx) = mpsc::channel(1);
+ ///
+ /// tokio::spawn(async move {
+ /// for i in 0..10 {
+ /// if let Err(_) = tx.send(i).await {
+ /// println!("receiver dropped");
+ /// return;
+ /// }
+ /// }
+ /// });
+ ///
+ /// while let Some(i) = rx.recv().await {
+ /// println!("got = {}", i);
+ /// }
+ /// }
/// ```
#[allow(clippy::needless_lifetimes)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988
pub async fn send(&mut self, value: T) -> Result<(), SendError> {
- use async_util::future::poll_fn;
+ use futures_util::future::poll_fn;
poll_fn(|cx| self.poll_ready(cx)).await?;
diff --git a/tokio-sync/src/mpsc/unbounded.rs b/tokio-sync/src/mpsc/unbounded.rs
index 9de0ae12..b9fbe08e 100644
--- a/tokio-sync/src/mpsc/unbounded.rs
+++ b/tokio-sync/src/mpsc/unbounded.rs
@@ -95,7 +95,7 @@ impl<T> UnboundedReceiver<T> {
/// TODO: Dox
#[allow(clippy::needless_lifetimes)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988
pub async fn recv(&mut self) -> Option<T> {
- use async_util::future::poll_fn;
+ use futures_util::future::poll_fn;
poll_fn(|cx| self.poll_recv(cx)).await
}
diff --git a/tokio-sync/src/oneshot.rs b/tokio-sync/src/oneshot.rs
index a51f8028..b53e6d4d 100644
--- a/tokio-sync/src/oneshot.rs
+++ b/tokio-sync/src/oneshot.rs
@@ -94,23 +94,25 @@ struct State(usize);
/// # Examples
///
/// ```
+/// #![feature(async_await)]
+///
/// use tokio::sync::oneshot;
-/// use futures::Future;
-/// use std::thread;
///
-/// let (sender, receiver) = oneshot::channel::<i32>();
+/// #[tokio::main]
+/// async fn main() {
+/// let (tx, rx) = oneshot::channel();
///
-/// # let t =
-/// thread::spawn(|| {
-/// let future = receiver.map(|i| {
-/// println!("got: {:?}", i);
+/// tokio::spawn(async move {
+/// if let Err(_) = tx.send(3) {
+/// println!("the receiver dropped");
+/// }
/// });
-/// // ...
-/// # return future;
-/// });
///
-/// sender.send(3).unwrap();
-/// # t.join().unwrap().wait().unwrap();
+/// match rx.await {
+/// Ok(v) => println!("got = {:?}", v),
+/// Err(_) => println!("the sender dropped"),
+/// }
+/// }
/// ```
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
#[allow(deprecated)]
@@ -218,11 +220,25 @@ impl<T> Sender<T> {
/// # Examples
///
/// ```
- /// unimplemented!();
+ /// #![feature(async_await)]
+ ///
+ /// use tokio::sync::oneshot;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (mut tx, rx) = oneshot::channel::<()>();
+ ///
+ /// tokio::spawn(async move {
+ /// drop(rx);
+ /// });
+ ///
+ /// tx.closed().await;
+ /// println!("the receiver dropped");
+ /// }
/// ```
#[allow(clippy::needless_lifetimes)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988
pub async fn closed(&mut self) {
- use async_util::future::poll_fn;
+ use futures_util::future::poll_fn;
poll_fn(|cx| self.poll_closed(cx)).await
}
diff --git a/tokio-sync/src/watch.rs b/tokio-sync/src/watch.rs
index 6efe437d..0c2bb243 100644
--- a/tokio-sync/src/watch.rs
+++ b/tokio-sync/src/watch.rs
@@ -18,20 +18,22 @@
//! # Examples
//!
//! ```
-//! use tokio::prelude::*;
+//! #![feature(async_await)]
+//!
//! use tokio::sync::watch;
//!
-//! # tokio::run(futures::future::lazy(|| {
-//! let (mut tx, rx) = watch::channel("hello");
+//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+//! let (mut tx, mut rx) = watch::channel("hello");
//!
-//! tokio::spawn(rx.for_each(|value| {
-//! println!("received = {:?}", value);
-//! Ok(())
-//! }).map_err(|_| ()));
+//! tokio::spawn(async move {
+//! while let Some(value) = rx.recv().await {
+//! println!("received = {:?}", value);
+//! }
+//! });
//!
-//! tx.broadcast("world").unwrap();
+//! tx.broadcast("world")?;
//! # Ok(())
-//! # }));
+//! # }
//! ```
//!
//! # Closing
@@ -59,6 +61,8 @@ use core::task::Poll::{Pending, Ready};
use core::task::{Context, Poll};
use fnv::FnvHashMap;
use futures_core::ready;
+use futures_util::future::poll_fn;
+use futures_util::pin_mut;
use std::ops;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
@@ -165,20 +169,22 @@ const CLOSED: usize = 1;
/// # Examples
///
/// ```
-/// use tokio::prelude::*;
+/// #![feature(async_await)]
+///
/// use tokio::sync::watch;
///
-/// # tokio::run(futures::future::lazy(|| {
-/// let (mut tx, rx) = watch::channel("hello");
+/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+/// let (mut tx, mut rx) = watch::channel("hello");
///
-/// tokio::spawn(rx.for_each(|value| {
-/// println!("received = {:?}", value);
-/// Ok(())
-/// }).map_err(|_| ()));
+/// tokio::spawn(async move {
+/// while let Some(value) = rx.recv().await {
+/// println!("received = {:?}", value);
+/// }
+/// });
///
-/// tx.broadcast("world").unwrap();
+/// tx.broadcast("world")?;
/// # Ok(())
-/// # }));
+/// # }
/// ```
pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
const INIT_ID: u64 = 0;
@@ -241,39 +247,56 @@ impl<T> Receiver<T> {
///
/// Only the **most recent** value is returned. If the receiver is falling
/// behind the sender, intermediate values are dropped.
- pub fn poll_ref(&mut self, cx: &mut Context<'_>) -> Poll<Option<Ref<'_, T>>> {
- // Make sure the task is up to date
- self.inner.waker.register_by_ref(cx.waker());
-
- let state = self.shared.version.load(SeqCst);
- let version = state & !CLOSED;
+ #[allow(clippy::needless_lifetimes)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988
+ pub async fn recv_ref<'a>(&'a mut self) -> Option<Ref<'a, T>> {
+ let shared = &self.shared;
+ let inner = &self.inner;
+ let version = self.ver;
+
+ match poll_fn(|cx| poll_lock(cx, shared, inner, version)).await {
+ Some((lock, version)) => {
+ self.ver = version;
+ Some(lock)
+ }
+ None => None,
+ }
+ }
+}
- if version != self.ver {
- // Track the latest version
- self.ver = version;
+fn poll_lock<'a, T>(
+ cx: &mut Context<'_>,
+ shared: &'a Arc<Shared<T>>,
+ inner: &Arc<WatchInner>,
+ ver: usize,
+) -> Poll<Option<(Ref<'a, T>, usize)>> {
+ // Make sure the task is up to date
+ inner.waker.register_by_ref(cx.waker());
- let inner = self.shared.value.read().unwrap();
+ let state = shared.version.load(SeqCst);
+ let version = state & !CLOSED;
- return Ready(Some(Ref { inner }));
- }
+ if version != ver {
+ let inner = shared.value.read().unwrap();
- if CLOSED == state & CLOSED {
- // The `Store` handle has been dropped.
- return Ready(None);
- }
+ return Ready(Some((Ref { inner }, version)));
+ }
- Pending
+ if CLOSED == state & CLOSED {
+ // The `Store` handle has been dropped.
+ return Ready(None);
}
+
+ Pending
}
impl<T: Clone> Receiver<T> {
/// Attempts to clone the latest value sent via the channel.
///
- /// This is equivalent to calling `Clone` on the value returned by `poll_ref`.
- #[allow(clippy::map_clone)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3274
- pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
- let item = ready!(self.poll_ref(cx));
- Ready(item.map(|v_ref| v_ref.clone()))
+ /// This is equivalent to calling `clone()` on the value returned by
+ /// `recv()`.
+ #[allow(clippy::needless_lifetimes, clippy::map_clone)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988
+ pub async fn recv(&mut self) -> Option<T> {
+ self.recv_ref().await.map(|v_ref| v_ref.clone())
}
}
@@ -282,8 +305,13 @@ impl<T: Clone> futures_core::Stream for Receiver<T> {
type Item = T;
#[allow(clippy::map_clone)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3274
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
- let item = ready!(self.poll_ref(cx));
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ use std::future::Future;
+
+ let fut = self.get_mut().recv();
+ pin_mut!(fut);
+
+ let item = ready!(fut.poll(cx));
Ready(item.map(|v_ref| v_ref.clone()))
}
}
@@ -354,11 +382,16 @@ impl<T> Sender<T> {
Ok(())
}
- /// Returns `Ready` when all receivers have dropped.
+ /// Completes when all receivers have dropped.
///
/// This allows the producer to get notified when interest in the produced
/// values is canceled and immediately stop doing work.
- pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()> {
+ #[allow(clippy::needless_lifetimes)] // false positive: https://github.com/rust-lang/rust-clippy/issues/3988
+ pub async fn closed(&mut self) {
+ poll_fn(|cx| self.poll_close(cx)).await
+ }
+
+ fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()> {
match self.shared.upgrade() {
Some(shared) => {
shared.cancel.register_by_ref(cx.waker());
diff --git a/tokio-sync/tests/atomic_waker.rs b/tokio-sync/tests/atomic_waker.rs
index 28a9f40d..ffd3cc26 100644
--- a/tokio-sync/tests/atomic_waker.rs
+++ b/tokio-sync/tests/atomic_waker.rs
@@ -1,7 +1,7 @@
#![deny(warnings, rust_2018_idioms)]
use std::task::Waker;
-use tokio_sync::task::AtomicWaker;
+use tokio_sync::AtomicWaker;
use tokio_test::task::MockTask;
trait AssertSend: Send {}
diff --git a/tokio-sync/tests/fuzz_atomic_waker.rs b/tokio-sync/tests/fuzz_atomic_waker.rs
index 3aebe280..526f1070 100644
--- a/tokio-sync/tests/fuzz_atomic_waker.rs
+++ b/tokio-sync/tests/fuzz_atomic_waker.rs
@@ -8,7 +8,7 @@ extern crate loom;
mod atomic_waker;
use crate::atomic_waker::AtomicWaker;
-use async_util::future::poll_fn;
+use futures_util::future::poll_fn;
use loom::futures::block_on;
use loom::sync::atomic::AtomicUsize;
use loom::thread;
diff --git a/tokio-sync/tests/fuzz_mpsc.rs b/tokio-sync/tests/fuzz_mpsc.rs
index 0951d88c..0218c3af 100644
--- a/tokio-sync/tests/fuzz_mpsc.rs
+++ b/tokio-sync/tests/fuzz_mpsc.rs
@@ -18,8 +18,7 @@ mod mpsc;
#[allow(warnings)]
mod semaphore;
-// use futures::{future::poll_fn, Stream};
-use async_util::future::poll_fn;
+use futures_util::future::poll_fn;
use loom::futures::block_on;
use loom::thread;
diff --git a/tokio-sync/tests/fuzz_semaphore.rs b/tokio-sync/tests/fuzz_semaphore.rs
index f54b99a7..ebc05db2 100644
--- a/tokio-sync/tests/fuzz_semaphore.rs
+++ b/tokio-sync/tests/fuzz_semaphore.rs
@@ -9,8 +9,8 @@ mod semaphore;
use crate::semaphore::*;
-use async_util::future::poll_fn;
use futures_core::ready;
+use futures_util::future::poll_fn;
use loom::futures::block_on;
use loom::thread;
use std::future::Future;
diff --git a/tokio-sync/tests/lock.rs b/tokio-sync/tests/lock.rs
index 69661531..af318a15 100644
--- a/tokio-sync/tests/lock.rs
+++ b/tokio-sync/tests/lock.rs
@@ -1,55 +1,57 @@
#![deny(warnings, rust_2018_idioms)]
-use pin_utils::pin_mut;
-use tokio_sync::lock::Lock;
-use tokio_test::task::MockTask;
+use tokio_sync::Lock;
+use tokio_test::task::spawn;
use tokio_test::{assert_pending, assert_ready};
#[test]
fn straight_execution() {
- let mut task = MockTask::new();
let mut l = Lock::new(100);
- // We can immediately acquire the lock and take the value
- task.enter(|cx| {
- let mut g = assert_ready!(l.poll_lock(cx));
+ {
+ let mut t = spawn(l.lock());
+ let mut g = assert_ready!(t.poll());
assert_eq!(&*g, &100);
*g = 99;
- drop(g);
-
- let mut g = assert_ready!(l.poll_lock(cx));
+ }
+ {
+ let mut t = spawn(l.lock());
+ let mut g = assert_ready!(t.poll());
assert_eq!(&*g, &99);
*g = 98;
- drop(g);
-
- let mut g = assert_ready!(l.poll_lock(cx));
+ }
+ {
+ let mut t = spawn(l.lock());
+ let mut g = assert_ready!(t.poll());
assert_eq!(&*g, &98);
// We can continue to access the guard even if the lock is dropped
+ drop(t);
drop(l);
*g = 97;
assert_eq!(&*g, &97);
- });
+ }
}
#[test]
fn readiness() {
- let mut t1 = MockTask::new();
- let mut t2 = MockTask::new();
-
- let mut l = Lock::new(100);
+ let mut l1 = Lock::new(100);
+ let mut l2 = l1.clone();
+ let mut t1 = spawn(l1.lock());
+ let mut t2 = spawn(l2.lock());
- let g = assert_ready!(t1.enter(|cx| l.poll_lock(cx)));
+ let g = assert_ready!(t1.poll());
// We can't now acquire the lease since it's already held in g
- assert_pending!(t2.enter(|cx| l.poll_lock(cx)));
+ assert_pending!(t2.poll());
// But once g unlocks, we can acquire it
drop(g);
assert!(t2.is_woken());
- assert_ready!(t2.enter(|cx| l.poll_lock(cx)));
+ assert_ready!(t2.poll());
}
+/*
#[test]
#[ignore]
fn lock() {
@@ -79,3 +81,4 @@ fn lock() {
let result = assert_ready!(task.poll(&mut l));
assert!(*result);
}
+*/
diff --git a/tokio-sync/tests/watch.rs b/tokio-sync/tests/watch.rs
index 10a6a822..3be30938 100644
--- a/tokio-sync/tests/watch.rs
+++ b/tokio-sync/tests/watch.rs
@@ -1,7 +1,7 @@
#![deny(warnings, rust_2018_idioms)]
use tokio_sync::watch;
-use tokio_test::task::MockTask;
+use tokio_test::task::spawn;
use tokio_test::{assert_pending, assert_ready};
/*
@@ -27,110 +27,111 @@ macro_rules! assert_not_ready {
*/
#[test]
-fn single_rx_poll_ref() {
+fn single_rx_recv_ref() {
let (tx, mut rx) = watch::channel("one");
- let mut task = MockTask::new();
- task.enter(|cx| {
- {
- let v = assert_ready!(rx.poll_ref(cx)).unwrap();
- assert_eq!(*v, "one");
- }
- assert_pending!(rx.poll_ref(cx));
- });
+ {
+ let mut t = spawn(rx.recv_ref());
+ let v = assert_ready!(t.poll()).unwrap();
+ assert_eq!(*v, "one");
+ }
- tx.broadcast("two").unwrap();
+ {
+ let mut t = spawn(rx.recv_ref());
- assert!(task.is_woken());
+ assert_pending!(t.poll());
- task.enter(|cx| {
- {
- let v = assert_ready!(rx.poll_ref(cx)).unwrap();
- assert_eq!(*v, "two");
- }
- assert_pending!(rx.poll_ref(cx));
- });
+ tx.broadcast("two").unwrap();
- drop(tx);
+ assert!(t.is_woken());
+
+ let v = assert_ready!(t.poll()).unwrap();
+ assert_eq!(*v, "two");
+ }
- assert!(task.is_woken());
+ {
+ let mut t = spawn(rx.recv_ref());
- task.enter(|cx| {
- let res = assert_ready!(rx.poll_ref(cx));
+ assert_pending!(t.poll());
+
+ drop(tx);
+
+ let res = assert_ready!(t.poll());
assert!(res.is_none());
- });
+ }
}
#[test]
-fn single_rx_poll_next() {
+fn single_rx_recv() {
let (tx, mut rx) = watch::channel("one");
- let mut task = MockTask::new();
- task.enter(|cx| {
- let v = assert_ready!(rx.poll_next(cx)).unwrap();
+ {
+ let mut t = spawn(rx.recv());
+ let v = assert_ready!(t.poll()).unwrap();
assert_eq!(v, "one");
- assert_pending!(rx.poll_ref(cx));
- });
+ }
- tx.broadcast("two").unwrap();
+ {
+ let mut t = spawn(rx.recv());
+
+ assert_pending!(t.poll());
+
+ tx.broadcast("two").unwrap();
- assert!(task.is_woken());
+ assert!(t.is_woken());
- task.enter(|cx| {
- let v = assert_ready!(rx.poll_next(cx)).unwrap();
+ let v = assert_ready!(t.poll()).unwrap();
assert_eq!(v, "two");
- assert_pending!(rx.poll_ref(cx));
- });
+ }
- drop(tx);
+ {
+ let mut t = spawn(rx.recv());
+
+ assert_pending!(t.poll());
- assert!(task.is_woken());
+ drop(tx);
- task.enter(|cx| {
- let res = assert_ready!(rx.poll_next(cx));
+ let res = assert_ready!(t.poll());
assert!(res.is_none());
- });
+ }
}
#[test]
#[cfg(feature = "async-traits")]
fn stream_impl() {
- use futures_core::Stream;
- use pin_utils::pin_mut;
+ use tokio::prelude::*;
- let (tx, rx) = watch::channel("one");
- let mut task = MockTask::new();
+ let (tx, mut rx) = watch::channel("one");
- pin_mut!(rx);
+ {
+ let mut t = spawn(rx.next());
+ let v = assert_ready!(t.poll()).unwrap();
+ assert_eq!(v, "one");
+ }
- task.enter(|cx| {
- {
- let v = assert_ready!(Stream::poll_next(rx.as_mut(), cx)).unwrap();
- assert_eq!(v, "one");
- }
- assert_pending!(rx.poll_ref(cx));
- });
+ {
+ let mut t = spawn(rx.next());
- tx.broadcast("two").unwrap();
+ assert_pending!(t.poll());
- assert!(task.is_woken());
+ tx.broadcast("two").unwrap();
- task.enter(|cx| {
- {
- let v = assert_ready!(Stream::poll_next(rx.as_mut(), cx)).unwrap();
- assert_eq!(v, "two");
- }
- assert_pending!(rx.poll_ref(cx));
- });
+ assert!(t.is_woken());
- drop(tx);
+ let v = assert_ready!(t.poll()).unwrap();
+ assert_eq!(v, "two");
+ }
- assert!(task.is_woken());
+ {
+ let mut t = spawn(rx.next());
- task.enter(|cx| {
- let res = assert_ready!(Stream::poll_next(rx, cx));
+ assert_pending!(t.poll());
+
+ drop(tx);
+
+ let res = assert_ready!(t.poll());
assert!(res.is_none());
- });
+ }
}
#[test]
@@ -138,67 +139,83 @@ fn multi_rx() {
let (tx, mut rx1) = watch::channel("one");
let mut rx2 = rx1.clone();
- let mut task1 = MockTask::new();
- let mut task2 = MockTask::new();
+ {
+ let mut t1 = spawn(rx1.recv_ref());
+ let mut t2 = spawn(rx2.recv_ref());
- task1.enter(|cx| {
- let res = assert_ready!(rx1.poll_ref(cx));
+ let res = assert_ready!(t1.poll());
assert_eq!(*res.unwrap(), "one");
- });
- task2.enter(|cx| {
- let res = assert_ready!(rx2.poll_ref(cx));
+ let res = assert_ready!(t2.poll());
assert_eq!(*res.unwrap(), "one");
- });
+ }
- tx.broadcast("two").unwrap();
+ let mut t2 = spawn(rx2.recv_ref());
+
+ {
+ let mut t1 = spawn(rx1.recv_ref());
+
+ assert_pending!(t1.poll());
+ assert_pending!(t2.poll());
- assert!(task1.is_woken());
- assert!(task2.is_woken());
+ tx.broadcast("two").unwrap();
- task1.enter(|cx| {
- let res = assert_ready!(rx1.poll_ref(cx));
+ assert!(t1.is_woken());
+ assert!(t2.is_woken());
+
+ let res = assert_ready!(t1.poll());
assert_eq!(*res.unwrap(), "two");
- });
+ }
+
+ {
+ let mut t1 = spawn(rx1.recv_ref());
- tx.broadcast("three").unwrap();
+ assert_pending!(t1.poll());
- assert!(task1.is_woken());
- assert!(task2.is_woken());
+ tx.broadcast("three").unwrap();
- task1.enter(|cx| {
- let res = assert_ready!(rx1.poll_ref(cx));
+ assert!(t1.is_woken());
+ assert!(t2.is_woken());
+
+ let res = assert_ready!(t1.poll());
assert_eq!(*res.unwrap(), "three");
- });
- task2.enter(|cx| {
- let res = assert_ready!(rx2.poll_ref(cx));
+ let res = assert_ready!(t2.poll());
assert_eq!(*res.unwrap(), "three");
- });
+ }
+
+ drop(t2);
- tx.broadcast("four").unwrap();
+ {
+ let mut t1 = spawn(rx1.recv_ref());
+ let mut t2 = spawn(rx2.recv_ref());
- task1.enter(|cx| {
- let res = assert_ready!(rx1.poll_ref(cx));
+ assert_pending!(t1.poll());
+ assert_pending!(t2.poll());
+
+ tx.broadcast("four").unwrap();
+
+ let res = assert_ready!(t1.poll());
assert_eq!(*res.unwrap(), "four");
- });
+ drop(t1);
- drop(tx);
+ let mut t1 = spawn(rx1.recv_ref());
+ assert_pending!(t1.poll());
- task1.enter(|cx| {
- let res = assert_ready!(rx1.poll_ref(cx));
+ drop(tx);
+
+ assert!(t1.is_woken());
+ let res = assert_ready!(t1.poll());
assert!(res.is_none());
- });
- task2.enter(|cx| {
- let res = assert_ready!(rx2.poll_ref(cx));
+ let res = assert_ready!(t2.poll());