//! An asynchronously awaitable `CancellationToken`.
//! The token allows to signal a cancellation request to one or more tasks.
use crate::{
loom::sync::{atomic::AtomicUsize, Mutex},
util::intrusive_double_linked_list::{LinkedList, ListNode},
};
use core::{
future::Future,
pin::Pin,
ptr::NonNull,
sync::atomic::Ordering,
task::{Context, Poll, Waker},
};
/// A token which can be used to signal a cancellation request to one or more
/// tasks.
///
/// Tasks can call [`CancellationToken::cancelled()`] in order to
/// obtain a Future which will be resolved when cancellation is requested.
///
/// Cancellation can be requested through the [`CancellationToken::cancel`] method.
///
/// # Examples
///
/// ```ignore
/// use tokio::select;
/// use tokio::scope::CancellationToken;
///
/// #[tokio::main]
/// async fn main() {
/// let token = CancellationToken::new();
/// let cloned_token = token.clone();
///
/// let join_handle = tokio::spawn(async move {
/// // Wait for either cancellation or a very long time
/// select! {
/// _ = cloned_token.cancelled() => {
/// // The token was cancelled
/// 5
/// }
/// _ = tokio::time::delay_for(std::time::Duration::from_secs(9999)) => {
/// 99
/// }
/// }
/// });
///
/// tokio::spawn(async move {
/// tokio::time::delay_for(std::time::Duration::from_millis(10)).await;
/// token.cancel();
/// });
///
/// assert_eq!(5, join_handle.await.unwrap());
/// }
/// ```
pub struct CancellationToken {
inner: NonNull<CancellationTokenState>,
}
// Safety: The CancellationToken is thread-safe and can be moved between threads,
// since all methods are internally synchronized.
unsafe impl Send for CancellationToken {}
unsafe impl Sync for CancellationToken {}
impl core::fmt::Debug for CancellationToken {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("CancellationToken")
.field("is_cancelled", &self.is_cancelled())
.finish()
}
}
impl Clone for CancellationToken {
fn clone(&self) -> Self {
// Safety: The state inside a `CancellationToken` is always valid, since
// is reference counted
let inner = self.state();
// Tokens are cloned by increasing their refcount
let current_state = inner.snapshot();
inner.increment_refcount(current_state);
CancellationToken { inner: self.inner }
}
}
impl Drop for CancellationToken {
fn drop(&mut self) {
let token_state_pointer = self.inner;
// Safety: The state inside a `CancellationToken` is always valid, since
// is reference counted
let inner = unsafe { &mut *self.inner.as_ptr() };
let mut current_state = inner.snapshot();
// We need to safe the parent, since the state might be released by the
// next call
let parent = inner.parent;
// Drop our own refcount
current_state = inner.decrement_refcount(current_state);
// If this was the last reference, unregister from the parent
if current_state.refcount == 0 {
if let Some(mut parent) = parent {
// Safety: Since we still retain a reference on the parent, it must be valid.
let parent = unsafe { parent.as_mut() };
parent.unregister_child(token_state_pointer, current_state);
}
}
}
}
impl CancellationToken {
/// Creates a new CancellationToken in the non-cancelled state.
#[allow(dead_code)]
pub(crate) fn new() -> CancellationToken {
let state = Box::new(CancellationTokenState::new(
None,
StateSnapshot {
cancel_state: CancellationState::NotCancelled,
has_parent_ref: false,
refcount: 1,
},
));
// Safety: We just created the Box. The pointer is guaranteed to be
// not null
CancellationToken {
inner: unsafe {