summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-05-03 12:35:47 -0700
committerGitHub <noreply@github.com>2020-05-03 12:35:47 -0700
commit264ae3bdb22004609de45b67e2890081bb47e5b2 (patch)
treec9f6005fef1c68de7331a98be3d18e2c112400e6 /tokio/src/sync
parent187af2e6a323be4193c82ad95f9aa32d2ae16869 (diff)
sync: move CancellationToken tests (#2477)
In preparation of work on `CancellationToken` internals, the tests are moved into `tests/` and are updated to not depend on internals.
Diffstat (limited to 'tokio/src/sync')
-rw-r--r--tokio/src/sync/cancellation_token.rs389
-rw-r--r--tokio/src/sync/mod.rs8
-rw-r--r--tokio/src/sync/tests/loom_cancellation_token.rs2
3 files changed, 37 insertions, 362 deletions
diff --git a/tokio/src/sync/cancellation_token.rs b/tokio/src/sync/cancellation_token.rs
index 66c302ca..d60d8e02 100644
--- a/tokio/src/sync/cancellation_token.rs
+++ b/tokio/src/sync/cancellation_token.rs
@@ -1,17 +1,15 @@
//! An asynchronously awaitable `CancellationToken`.
//! The token allows to signal a cancellation request to one or more tasks.
-use crate::{
- loom::sync::{atomic::AtomicUsize, Mutex},
- util::intrusive_double_linked_list::{LinkedList, ListNode},
-};
-use core::{
- future::Future,
- pin::Pin,
- ptr::NonNull,
- sync::atomic::Ordering,
- task::{Context, Poll, Waker},
-};
+use crate::loom::sync::atomic::AtomicUsize;
+use crate::loom::sync::Mutex;
+use crate::util::intrusive_double_linked_list::{LinkedList, ListNode};
+
+use core::future::Future;
+use core::pin::Pin;
+use core::ptr::NonNull;
+use core::sync::atomic::Ordering;
+use core::task::{Context, Poll, Waker};
/// A token which can be used to signal a cancellation request to one or more
/// tasks.
@@ -62,6 +60,25 @@ pub struct CancellationToken {
unsafe impl Send for CancellationToken {}
unsafe impl Sync for CancellationToken {}
+/// A Future that is resolved once the corresponding [`CancellationToken`]
+/// was cancelled
+#[must_use = "futures do nothing unless polled"]
+pub struct WaitForCancellationFuture<'a> {
+ /// The CancellationToken that is associated with this WaitForCancellationFuture
+ cancellation_token: Option<&'a CancellationToken>,
+ /// Node for waiting at the cancellation_token
+ wait_node: ListNode<WaitQueueEntry>,
+ /// Whether this future was registered at the token yet as a waiter
+ is_registered: bool,
+}
+
+// Safety: Futures can be sent between threads as long as the underlying
+// cancellation_token is thread-safe (Sync),
+// which allows to poll/register/unregister from a different thread.
+unsafe impl<'a> Send for WaitForCancellationFuture<'a> {}
+
+// ===== impl CancellationToken =====
+
impl core::fmt::Debug for CancellationToken {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("CancellationToken")
@@ -114,8 +131,7 @@ impl Drop for CancellationToken {
impl CancellationToken {
/// Creates a new CancellationToken in the non-cancelled state.
- #[allow(dead_code)]
- pub(crate) fn new() -> CancellationToken {
+ pub fn new() -> CancellationToken {
let state = Box::new(CancellationTokenState::new(
None,
StateSnapshot {
@@ -177,8 +193,7 @@ impl CancellationToken {
/// assert_eq!(5, join_handle.await.unwrap());
/// }
/// ```
- #[allow(dead_code)]
- pub(crate) fn child_token(&self) -> CancellationToken {
+ pub fn child_token(&self) -> CancellationToken {
let inner = self.state();
// Increment the refcount of this token. It will be referenced by the
@@ -232,31 +247,11 @@ impl CancellationToken {
}
}
- /// Returns the number of child tokens
- #[cfg(all(test, not(loom)))]
- fn child_tokens(&self) -> usize {
- let mut result = 0;
- let inner = self.state();
-
- let guard = inner.synchronized.lock().unwrap();
- let mut child = guard.first_child;
- while let Some(mut c) = child {
- result += 1;
- // Safety: The child state is accessed from within a Mutex. Since
- // the child needs to take the Mutex to unregister itself before
- // getting destroyed, it is guaranteed to be alive.
- child = unsafe { c.as_mut().from_parent.next_peer };
- }
-
- result
- }
-
/// Cancel the [`CancellationToken`] and all child tokens which had been
/// derived from it.
///
/// This will wake up all tasks which are waiting for cancellation.
- #[allow(dead_code)]
- pub(crate) fn cancel(&self) {
+ pub fn cancel(&self) {
self.state().cancel();
}
@@ -295,22 +290,7 @@ impl CancellationToken {
}
}
-/// A Future that is resolved once the corresponding [`CancellationToken`]
-/// was cancelled
-#[must_use = "futures do nothing unless polled"]
-pub struct WaitForCancellationFuture<'a> {
- /// The CancellationToken that is associated with this WaitForCancellationFuture
- cancellation_token: Option<&'a CancellationToken>,
- /// Node for waiting at the cancellation_token
- wait_node: ListNode<WaitQueueEntry>,
- /// Whether this future was registered at the token yet as a waiter
- is_registered: bool,
-}
-
-// Safety: Futures can be sent between threads as long as the underlying
-// cancellation_token is thread-safe (Sync),
-// which allows to poll/register/unregister from a different thread.
-unsafe impl<'a> Send for WaitForCancellationFuture<'a> {}
+// ===== impl WaitForCancellationFuture =====
impl<'a> core::fmt::Debug for WaitForCancellationFuture<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
@@ -879,308 +859,3 @@ impl CancellationTokenState {
wait_node.task = None;
}
}
-
-#[cfg(all(test, not(loom)))]
-mod tests {
- use super::*;
- use crate::pin;
- use futures_test::task::new_count_waker;
-
- #[test]
- fn cancel_token() {
- let (waker, wake_counter) = new_count_waker();
- let token = CancellationToken::new();
- assert_eq!(false, token.is_cancelled());
-
- let wait_fut = token.cancelled();
- pin!(wait_fut);
-
- assert_eq!(
- Poll::Pending,
- wait_fut.as_mut().poll(&mut Context::from_waker(&waker))
- );
- assert_eq!(wake_counter, 0);
-
- let wait_fut_2 = token.cancelled();
- pin!(wait_fut_2);
-
- token.cancel();
- assert_eq!(wake_counter, 1);
- assert_eq!(true, token.is_cancelled());
-
- assert_eq!(
- Poll::Ready(()),
- wait_fut.as_mut().poll(&mut Context::from_waker(&waker))
- );
- assert_eq!(
- Poll::Ready(()),
- wait_fut_2.as_mut().poll(&mut Context::from_waker(&waker))
- );
- }
-
- #[test]
- fn cancel_child_token_through_parent() {
- let (waker, wake_counter) = new_count_waker();
- let token = CancellationToken::new();
-
- let child_token = token.child_token();
- assert_eq!(
- StateSnapshot {
- refcount: 1,
- has_parent_ref: true,
- cancel_state: CancellationState::NotCancelled,
- },
- child_token.state().snapshot()
- );
-
- let child_fut = child_token.cancelled();
- pin!(child_fut);
- let parent_fut = token.cancelled();
- pin!(parent_fut);
-
- assert_eq!(
- Poll::Pending,
- child_fut.as_mut().poll(&mut Context::from_waker(&waker))
- );
- assert_eq!(
- Poll::Pending,
- parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
- );
- assert_eq!(wake_counter, 0);
-
- token.cancel();
- assert_eq!(wake_counter, 2);
- assert_eq!(true, token.is_cancelled());
- assert_eq!(true, child_token.is_cancelled());
- assert_eq!(
- StateSnapshot {
- refcount: 1,
- has_parent_ref: false,
- cancel_state: CancellationState::Cancelled,
- },
- child_token.state().snapshot()
- );
-
- assert_eq!(
- Poll::Ready(()),
- child_fut.as_mut().poll(&mut Context::from_waker(&waker))
- );
- assert_eq!(
- Poll::Ready(()),
- parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
- );
- }
-
- #[test]
- fn cancel_child_token_without_parent() {
- let (waker, wake_counter) = new_count_waker();
- let token = CancellationToken::new();
-
- let child_token_1 = token.child_token();
-
- let child_fut = child_token_1.cancelled();
- pin!(child_fut);
- let parent_fut = token.cancelled();
- pin!(parent_fut);
-
- assert_eq!(
- Poll::Pending,
- child_fut.as_mut().poll(&mut Context::from_waker(&waker))
- );
- assert_eq!(
- Poll::Pending,
- parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
- );
- assert_eq!(wake_counter, 0);
-
- child_token_1.cancel();
- assert_eq!(wake_counter, 1);
- assert_eq!(false, token.is_cancelled());
- assert_eq!(true, child_token_1.is_cancelled());
- assert_eq!(
- StateSnapshot {
- refcount: 1,
- has_parent_ref: true,
- cancel_state: CancellationState::Cancelled,
- },
- child_token_1.state().snapshot()
- );
-
- assert_eq!(
- Poll::Ready(()),
- child_fut.as_mut().poll(&mut Context::from_waker(&waker))
- );
- assert_eq!(
- Poll::Pending,
- parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
- );
-
- let child_token_2 = token.child_token();
- let child_fut_2 = child_token_2.cancelled();
- pin!(child_fut_2);
-
- assert_eq!(
- Poll::Pending,
- child_fut_2.as_mut().poll(&mut Context::from_waker(&waker))
- );
- assert_eq!(
- Poll::Pending,
- parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
- );
-
- token.cancel();
- assert_eq!(wake_counter, 3);
- assert_eq!(true, token.is_cancelled());
- assert_eq!(true, child_token_2.is_cancelled());
-
- assert_eq!(
- Poll::Ready(()),
- child_fut_2.as_mut().poll(&mut Context::from_waker(&waker))
- );
- assert_eq!(
- Poll::Ready(()),
- parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
- );
- }
-
- #[test]
- fn create_child_token_after_parent_was_cancelled() {
- for drop_child_first in [true, false].iter().cloned() {
- let (waker, wake_counter) = new_count_waker();
- let token = CancellationToken::new();
- token.cancel();
-
- let child_token = token.child_token();
- assert_eq!(
- StateSnapshot {
- refcount: 1,
- has_parent_ref: false,
- cancel_state: CancellationState::Cancelled,
- },
- child_token.state().snapshot()
- );
-
- {
- let child_fut = child_token.cancelled();
- pin!(child_fut);
- let parent_fut = token.cancelled();
- pin!(parent_fut);
-
- assert_eq!(
- Poll::Ready(()),
- child_fut.as_mut().poll(&mut Context::from_waker(&waker))
- );
- assert_eq!(
- Poll::Ready(()),
- parent_fut.as_mut().poll(&mut Context::from_waker(&waker))
- );
- assert_eq!(wake_counter, 0);
-
- drop(child_fut);
- drop(parent_fut);
- }
-
- if drop_child_first {
- drop(child_token);
- drop(token);
- } else {
- drop(token);
- drop(child_token);
- }
- }
- }
-
- #[test]
- fn drop_multiple_child_tokens() {
- for drop_first_child_first in &[true, false] {
- let token = CancellationToken::new();
- let mut child_tokens = [None, None, None];
- for i in 0..child_tokens.len() {
- child_tokens[i] = Some(token.child_token());
- }
- assert_eq!(child_tokens.len(), token.child_tokens());
-
- assert_eq!(
- StateSnapshot {
- refcount: child_tokens.len() + 1,
- has_parent_ref: false,
- cancel_state: CancellationState::NotCancelled,
- },
- token.state().snapshot()
- );
- assert_eq!(
- StateSnapshot {
- refcount: 1,
- has_parent_ref: true,
- cancel_state: CancellationState::NotCancelled,
- },
- child_tokens[0].as_ref().unwrap().state().snapshot()
- );
-
- let mut remaining_childs = child_tokens.len();
- for i in 0..child_tokens.len() {
- if *drop_first_child_first {
- child_tokens[i] = None;
- } else {
- child_tokens[child_tokens.len() - 1 - i] = None;
- }
- remaining_childs -= 1;
- assert_eq!(
- StateSnapshot {
- refcount: remaining_childs + 1,
- has_parent_ref: false,
- cancel_state: CancellationState::NotCancelled,
- },
- token.state().snapshot()
- );
- assert_eq!(remaining_childs, token.child_tokens());
- }
-
- drop(token);
- }
- }
-
- #[test]
- fn drop_parent_before_child_tokens() {
- let token = CancellationToken::new();
- let child1 = token.child_token();
- let child2 = token.child_token();
-
- drop(token);
- assert_eq!(
- StateSnapshot {
- refcount: 1,
- has_parent_ref: true,
- cancel_state: CancellationState::NotCancelled,
- },
- child1.state().snapshot()
- );
-
- drop(child1);
- drop(child2);
- }
-
- #[test]
- fn parent_refcoutn_only_decreases_if_all_child_clones_are_released() {
- let token = CancellationToken::new();
- let child1 = token.child_token();
- let child2 = child1.clone();
-
- assert_eq!(2, token.state().snapshot().refcount);
-
- drop(child1);
- assert_eq!(2, token.state().snapshot().refcount);
- drop(child2);
- assert_eq!(1, token.state().snapshot().refcount);
- }
-
- #[test]
- fn cancellation_future_is_send() {
- let token = CancellationToken::new();
- let fut = token.cancelled();
-
- fn with_send<T: Send>(_: T) {}
-
- with_send(fut);
- }
-}
diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs
index b2c66e44..e558634f 100644
--- a/tokio/src/sync/mod.rs
+++ b/tokio/src/sync/mod.rs
@@ -425,10 +425,10 @@ cfg_sync! {
pub mod broadcast;
- #[cfg(tokio_unstable)]
- mod cancellation_token;
- #[cfg(tokio_unstable)]
- pub use cancellation_token::{CancellationToken, WaitForCancellationFuture};
+ cfg_unstable! {
+ mod cancellation_token;
+ pub use cancellation_token::{CancellationToken, WaitForCancellationFuture};
+ }
pub mod mpsc;
diff --git a/tokio/src/sync/tests/loom_cancellation_token.rs b/tokio/src/sync/tests/loom_cancellation_token.rs
index aac84f80..e9c9f3dd 100644
--- a/tokio/src/sync/tests/loom_cancellation_token.rs
+++ b/tokio/src/sync/tests/loom_cancellation_token.rs
@@ -1,4 +1,4 @@
-use crate::scope::CancellationToken;
+use crate::sync::CancellationToken;
use loom::{future::block_on, thread};
use tokio_test::assert_ok;