diff options
author | Carl Lerche <me@carllerche.com> | 2020-05-03 12:35:47 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-03 12:35:47 -0700 |
commit | 264ae3bdb22004609de45b67e2890081bb47e5b2 (patch) | |
tree | c9f6005fef1c68de7331a98be3d18e2c112400e6 /tokio/src/sync | |
parent | 187af2e6a323be4193c82ad95f9aa32d2ae16869 (diff) |
sync: move CancellationToken tests (#2477)
In preparation of work on `CancellationToken` internals, the tests are
moved into `tests/` and are updated to not depend on internals.
Diffstat (limited to 'tokio/src/sync')
-rw-r--r-- | tokio/src/sync/cancellation_token.rs | 389 | ||||
-rw-r--r-- | tokio/src/sync/mod.rs | 8 | ||||
-rw-r--r-- | tokio/src/sync/tests/loom_cancellation_token.rs | 2 |
3 files changed, 37 insertions, 362 deletions
diff --git a/tokio/src/sync/cancellation_token.rs b/tokio/src/sync/cancellation_token.rs index 66c302ca..d60d8e02 100644 --- a/tokio/src/sync/cancellation_token.rs +++ b/tokio/src/sync/cancellation_token.rs @@ -1,17 +1,15 @@ //! An asynchronously awaitable `CancellationToken`. //! The token allows to signal a cancellation request to one or more tasks. -use crate::{ - loom::sync::{atomic::AtomicUsize, Mutex}, - util::intrusive_double_linked_list::{LinkedList, ListNode}, -}; -use core::{ - future::Future, - pin::Pin, - ptr::NonNull, - sync::atomic::Ordering, - task::{Context, Poll, Waker}, -}; +use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::Mutex; +use crate::util::intrusive_double_linked_list::{LinkedList, ListNode}; + +use core::future::Future; +use core::pin::Pin; +use core::ptr::NonNull; +use core::sync::atomic::Ordering; +use core::task::{Context, Poll, Waker}; /// A token which can be used to signal a cancellation request to one or more /// tasks. @@ -62,6 +60,25 @@ pub struct CancellationToken { unsafe impl Send for CancellationToken {} unsafe impl Sync for CancellationToken {} +/// A Future that is resolved once the corresponding [`CancellationToken`] +/// was cancelled +#[must_use = "futures do nothing unless polled"] +pub struct WaitForCancellationFuture<'a> { + /// The CancellationToken that is associated with this WaitForCancellationFuture + cancellation_token: Option<&'a CancellationToken>, + /// Node for waiting at the cancellation_token + wait_node: ListNode<WaitQueueEntry>, + /// Whether this future was registered at the token yet as a waiter + is_registered: bool, +} + +// Safety: Futures can be sent between threads as long as the underlying +// cancellation_token is thread-safe (Sync), +// which allows to poll/register/unregister from a different thread. +unsafe impl<'a> Send for WaitForCancellationFuture<'a> {} + +// ===== impl CancellationToken ===== + impl core::fmt::Debug for CancellationToken { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("CancellationToken") @@ -114,8 +131,7 @@ impl Drop for CancellationToken { impl CancellationToken { /// Creates a new CancellationToken in the non-cancelled state. - #[allow(dead_code)] - pub(crate) fn new() -> CancellationToken { + pub fn new() -> CancellationToken { let state = Box::new(CancellationTokenState::new( None, StateSnapshot { @@ -177,8 +193,7 @@ impl CancellationToken { /// assert_eq!(5, join_handle.await.unwrap()); /// } /// ``` - #[allow(dead_code)] - pub(crate) fn child_token(&self) -> CancellationToken { + pub fn child_token(&self) -> CancellationToken { let inner = self.state(); // Increment the refcount of this token. It will be referenced by the @@ -232,31 +247,11 @@ impl CancellationToken { } } - /// Returns the number of child tokens - #[cfg(all(test, not(loom)))] - fn child_tokens(&self) -> usize { - let mut result = 0; - let inner = self.state(); - - let guard = inner.synchronized.lock().unwrap(); - let mut child = guard.first_child; - while let Some(mut c) = child { - result += 1; - // Safety: The child state is accessed from within a Mutex. Since - // the child needs to take the Mutex to unregister itself before - // getting destroyed, it is guaranteed to be alive. - child = unsafe { c.as_mut().from_parent.next_peer }; - } - - result - } - /// Cancel the [`CancellationToken`] and all child tokens which had been /// derived from it. /// /// This will wake up all tasks which are waiting for cancellation. - #[allow(dead_code)] - pub(crate) fn cancel(&self) { + pub fn cancel(&self) { self.state().cancel(); } @@ -295,22 +290,7 @@ impl CancellationToken { } } -/// A Future that is resolved once the corresponding [`CancellationToken`] -/// was cancelled -#[must_use = "futures do nothing unless polled"] -pub struct WaitForCancellationFuture<'a> { - /// The CancellationToken that is associated with this WaitForCancellationFuture - cancellation_token: Option<&'a CancellationToken>, - /// Node for waiting at the cancellation_token - wait_node: ListNode<WaitQueueEntry>, - /// Whether this future was registered at the token yet as a waiter - is_registered: bool, -} - -// Safety: Futures can be sent between threads as long as the underlying -// cancellation_token is thread-safe (Sync), -// which allows to poll/register/unregister from a different thread. -unsafe impl<'a> Send for WaitForCancellationFuture<'a> {} +// ===== impl WaitForCancellationFuture ===== impl<'a> core::fmt::Debug for WaitForCancellationFuture<'a> { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { @@ -879,308 +859,3 @@ impl CancellationTokenState { wait_node.task = None; } } - -#[cfg(all(test, not(loom)))] -mod tests { - use super::*; - use crate::pin; - use futures_test::task::new_count_waker; - - #[test] - fn cancel_token() { - let (waker, wake_counter) = new_count_waker(); - let token = CancellationToken::new(); - assert_eq!(false, token.is_cancelled()); - - let wait_fut = token.cancelled(); - pin!(wait_fut); - - assert_eq!( - Poll::Pending, - wait_fut.as_mut().poll(&mut Context::from_waker(&waker)) - ); - assert_eq!(wake_counter, 0); - - let wait_fut_2 = token.cancelled(); - pin!(wait_fut_2); - - token.cancel(); - assert_eq!(wake_counter, 1); - assert_eq!(true, token.is_cancelled()); - - assert_eq!( - Poll::Ready(()), - wait_fut.as_mut().poll(&mut Context::from_waker(&waker)) - ); - assert_eq!( - Poll::Ready(()), - wait_fut_2.as_mut().poll(&mut Context::from_waker(&waker)) - ); - } - - #[test] - fn cancel_child_token_through_parent() { - let (waker, wake_counter) = new_count_waker(); - let token = CancellationToken::new(); - - let child_token = token.child_token(); - assert_eq!( - StateSnapshot { - refcount: 1, - has_parent_ref: true, - cancel_state: CancellationState::NotCancelled, - }, - child_token.state().snapshot() - ); - - let child_fut = child_token.cancelled(); - pin!(child_fut); - let parent_fut = token.cancelled(); - pin!(parent_fut); - - assert_eq!( - Poll::Pending, - child_fut.as_mut().poll(&mut Context::from_waker(&waker)) - ); - assert_eq!( - Poll::Pending, - parent_fut.as_mut().poll(&mut Context::from_waker(&waker)) - ); - assert_eq!(wake_counter, 0); - - token.cancel(); - assert_eq!(wake_counter, 2); - assert_eq!(true, token.is_cancelled()); - assert_eq!(true, child_token.is_cancelled()); - assert_eq!( - StateSnapshot { - refcount: 1, - has_parent_ref: false, - cancel_state: CancellationState::Cancelled, - }, - child_token.state().snapshot() - ); - - assert_eq!( - Poll::Ready(()), - child_fut.as_mut().poll(&mut Context::from_waker(&waker)) - ); - assert_eq!( - Poll::Ready(()), - parent_fut.as_mut().poll(&mut Context::from_waker(&waker)) - ); - } - - #[test] - fn cancel_child_token_without_parent() { - let (waker, wake_counter) = new_count_waker(); - let token = CancellationToken::new(); - - let child_token_1 = token.child_token(); - - let child_fut = child_token_1.cancelled(); - pin!(child_fut); - let parent_fut = token.cancelled(); - pin!(parent_fut); - - assert_eq!( - Poll::Pending, - child_fut.as_mut().poll(&mut Context::from_waker(&waker)) - ); - assert_eq!( - Poll::Pending, - parent_fut.as_mut().poll(&mut Context::from_waker(&waker)) - ); - assert_eq!(wake_counter, 0); - - child_token_1.cancel(); - assert_eq!(wake_counter, 1); - assert_eq!(false, token.is_cancelled()); - assert_eq!(true, child_token_1.is_cancelled()); - assert_eq!( - StateSnapshot { - refcount: 1, - has_parent_ref: true, - cancel_state: CancellationState::Cancelled, - }, - child_token_1.state().snapshot() - ); - - assert_eq!( - Poll::Ready(()), - child_fut.as_mut().poll(&mut Context::from_waker(&waker)) - ); - assert_eq!( - Poll::Pending, - parent_fut.as_mut().poll(&mut Context::from_waker(&waker)) - ); - - let child_token_2 = token.child_token(); - let child_fut_2 = child_token_2.cancelled(); - pin!(child_fut_2); - - assert_eq!( - Poll::Pending, - child_fut_2.as_mut().poll(&mut Context::from_waker(&waker)) - ); - assert_eq!( - Poll::Pending, - parent_fut.as_mut().poll(&mut Context::from_waker(&waker)) - ); - - token.cancel(); - assert_eq!(wake_counter, 3); - assert_eq!(true, token.is_cancelled()); - assert_eq!(true, child_token_2.is_cancelled()); - - assert_eq!( - Poll::Ready(()), - child_fut_2.as_mut().poll(&mut Context::from_waker(&waker)) - ); - assert_eq!( - Poll::Ready(()), - parent_fut.as_mut().poll(&mut Context::from_waker(&waker)) - ); - } - - #[test] - fn create_child_token_after_parent_was_cancelled() { - for drop_child_first in [true, false].iter().cloned() { - let (waker, wake_counter) = new_count_waker(); - let token = CancellationToken::new(); - token.cancel(); - - let child_token = token.child_token(); - assert_eq!( - StateSnapshot { - refcount: 1, - has_parent_ref: false, - cancel_state: CancellationState::Cancelled, - }, - child_token.state().snapshot() - ); - - { - let child_fut = child_token.cancelled(); - pin!(child_fut); - let parent_fut = token.cancelled(); - pin!(parent_fut); - - assert_eq!( - Poll::Ready(()), - child_fut.as_mut().poll(&mut Context::from_waker(&waker)) - ); - assert_eq!( - Poll::Ready(()), - parent_fut.as_mut().poll(&mut Context::from_waker(&waker)) - ); - assert_eq!(wake_counter, 0); - - drop(child_fut); - drop(parent_fut); - } - - if drop_child_first { - drop(child_token); - drop(token); - } else { - drop(token); - drop(child_token); - } - } - } - - #[test] - fn drop_multiple_child_tokens() { - for drop_first_child_first in &[true, false] { - let token = CancellationToken::new(); - let mut child_tokens = [None, None, None]; - for i in 0..child_tokens.len() { - child_tokens[i] = Some(token.child_token()); - } - assert_eq!(child_tokens.len(), token.child_tokens()); - - assert_eq!( - StateSnapshot { - refcount: child_tokens.len() + 1, - has_parent_ref: false, - cancel_state: CancellationState::NotCancelled, - }, - token.state().snapshot() - ); - assert_eq!( - StateSnapshot { - refcount: 1, - has_parent_ref: true, - cancel_state: CancellationState::NotCancelled, - }, - child_tokens[0].as_ref().unwrap().state().snapshot() - ); - - let mut remaining_childs = child_tokens.len(); - for i in 0..child_tokens.len() { - if *drop_first_child_first { - child_tokens[i] = None; - } else { - child_tokens[child_tokens.len() - 1 - i] = None; - } - remaining_childs -= 1; - assert_eq!( - StateSnapshot { - refcount: remaining_childs + 1, - has_parent_ref: false, - cancel_state: CancellationState::NotCancelled, - }, - token.state().snapshot() - ); - assert_eq!(remaining_childs, token.child_tokens()); - } - - drop(token); - } - } - - #[test] - fn drop_parent_before_child_tokens() { - let token = CancellationToken::new(); - let child1 = token.child_token(); - let child2 = token.child_token(); - - drop(token); - assert_eq!( - StateSnapshot { - refcount: 1, - has_parent_ref: true, - cancel_state: CancellationState::NotCancelled, - }, - child1.state().snapshot() - ); - - drop(child1); - drop(child2); - } - - #[test] - fn parent_refcoutn_only_decreases_if_all_child_clones_are_released() { - let token = CancellationToken::new(); - let child1 = token.child_token(); - let child2 = child1.clone(); - - assert_eq!(2, token.state().snapshot().refcount); - - drop(child1); - assert_eq!(2, token.state().snapshot().refcount); - drop(child2); - assert_eq!(1, token.state().snapshot().refcount); - } - - #[test] - fn cancellation_future_is_send() { - let token = CancellationToken::new(); - let fut = token.cancelled(); - - fn with_send<T: Send>(_: T) {} - - with_send(fut); - } -} diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index b2c66e44..e558634f 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -425,10 +425,10 @@ cfg_sync! { pub mod broadcast; - #[cfg(tokio_unstable)] - mod cancellation_token; - #[cfg(tokio_unstable)] - pub use cancellation_token::{CancellationToken, WaitForCancellationFuture}; + cfg_unstable! { + mod cancellation_token; + pub use cancellation_token::{CancellationToken, WaitForCancellationFuture}; + } pub mod mpsc; diff --git a/tokio/src/sync/tests/loom_cancellation_token.rs b/tokio/src/sync/tests/loom_cancellation_token.rs index aac84f80..e9c9f3dd 100644 --- a/tokio/src/sync/tests/loom_cancellation_token.rs +++ b/tokio/src/sync/tests/loom_cancellation_token.rs @@ -1,4 +1,4 @@ -use crate::scope::CancellationToken; +use crate::sync::CancellationToken; use loom::{future::block_on, thread}; use tokio_test::assert_ok; |