diff options
author | Carl Lerche <me@carllerche.com> | 2020-08-23 08:45:52 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-23 17:45:52 +0200 |
commit | 9d58b70151d7dbb66139125520d383401396eb98 (patch) | |
tree | 3a5eed29fb88cad906a113207fc040ce56433bdc /tokio-util | |
parent | fde72bf047080287f92e24f025301e6b7325c341 (diff) |
sync: move CancellationToken to tokio-util (#2721)
* sync: move CancellationToken to tokio-util
The `CancellationToken` utility is only available with the
`tokio_unstable` flag. This was done as the API is not final, but it
adds friction for users.
This patch moves `CancellationToken` to tokio-util where it is generally
available. The tokio-util crate does not have any constraints on
breaking change releases.
* fix clippy
* clippy again
Diffstat (limited to 'tokio-util')
-rw-r--r-- | tokio-util/Cargo.toml | 1 | ||||
-rw-r--r-- | tokio-util/src/lib.rs | 4 | ||||
-rw-r--r-- | tokio-util/src/loom.rs | 1 | ||||
-rw-r--r-- | tokio-util/src/sync/cancellation_token.rs | 867 | ||||
-rw-r--r-- | tokio-util/src/sync/intrusive_double_linked_list.rs | 788 | ||||
-rw-r--r-- | tokio-util/src/sync/mod.rs | 6 | ||||
-rw-r--r-- | tokio-util/src/sync/tests/loom_cancellation_token.rs | 155 | ||||
-rw-r--r-- | tokio-util/src/sync/tests/mod.rs | 1 | ||||
-rw-r--r-- | tokio-util/tests/sync_cancellation_token.rs | 218 |
9 files changed, 2041 insertions, 0 deletions
diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index de9bd000..b47c9dfc 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -46,6 +46,7 @@ tokio = { version = "0.3.0", path = "../tokio", features = ["full"] } tokio-test = { version = "0.3.0", path = "../tokio-test" } futures = "0.3.0" +futures-test = "0.3.5" [package.metadata.docs.rs] all-features = true diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index e56f7a5f..ec69f59d 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -24,6 +24,8 @@ #[macro_use] mod cfg; +mod loom; + cfg_codec! { pub mod codec; } @@ -35,3 +37,5 @@ cfg_udp! { cfg_compat! { pub mod compat; } + +pub mod sync; diff --git a/tokio-util/src/loom.rs b/tokio-util/src/loom.rs new file mode 100644 index 00000000..dd03feab --- /dev/null +++ b/tokio-util/src/loom.rs @@ -0,0 +1 @@ +pub(crate) use std::sync; diff --git a/tokio-util/src/sync/cancellation_token.rs b/tokio-util/src/sync/cancellation_token.rs new file mode 100644 index 00000000..baab5ce2 --- /dev/null +++ b/tokio-util/src/sync/cancellation_token.rs @@ -0,0 +1,867 @@ +//! An asynchronously awaitable `CancellationToken`. +//! The token allows to signal a cancellation request to one or more tasks. + +use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::Mutex; +use crate::sync::intrusive_double_linked_list::{LinkedList, ListNode}; + +use core::future::Future; +use core::pin::Pin; +use core::ptr::NonNull; +use core::sync::atomic::Ordering; +use core::task::{Context, Poll, Waker}; + +/// A token which can be used to signal a cancellation request to one or more +/// tasks. +/// +/// Tasks can call [`CancellationToken::cancelled()`] in order to +/// obtain a Future which will be resolved when cancellation is requested. +/// +/// Cancellation can be requested through the [`CancellationToken::cancel`] method. +/// +/// # Examples +/// +/// ```ignore +/// use tokio::select; +/// use tokio::scope::CancellationToken; +/// +/// #[tokio::main] +/// async fn main() { +/// let token = CancellationToken::new(); +/// let cloned_token = token.clone(); +/// +/// let join_handle = tokio::spawn(async move { +/// // Wait for either cancellation or a very long time +/// select! { +/// _ = cloned_token.cancelled() => { +/// // The token was cancelled +/// 5 +/// } +/// _ = tokio::time::delay_for(std::time::Duration::from_secs(9999)) => { +/// 99 +/// } +/// } +/// }); +/// +/// tokio::spawn(async move { +/// tokio::time::delay_for(std::time::Duration::from_millis(10)).await; +/// token.cancel(); +/// }); +/// +/// assert_eq!(5, join_handle.await.unwrap()); +/// } +/// ``` +pub struct CancellationToken { + inner: NonNull<CancellationTokenState>, +} + +// Safety: The CancellationToken is thread-safe and can be moved between threads, +// since all methods are internally synchronized. +unsafe impl Send for CancellationToken {} +unsafe impl Sync for CancellationToken {} + +/// A Future that is resolved once the corresponding [`CancellationToken`] +/// was cancelled +#[must_use = "futures do nothing unless polled"] +pub struct WaitForCancellationFuture<'a> { + /// The CancellationToken that is associated with this WaitForCancellationFuture + cancellation_token: Option<&'a CancellationToken>, + /// Node for waiting at the cancellation_token + wait_node: ListNode<WaitQueueEntry>, + /// Whether this future was registered at the token yet as a waiter + is_registered: bool, +} + +// Safety: Futures can be sent between threads as long as the underlying +// cancellation_token is thread-safe (Sync), +// which allows to poll/register/unregister from a different thread. +unsafe impl<'a> Send for WaitForCancellationFuture<'a> {} + +// ===== impl CancellationToken ===== + +impl core::fmt::Debug for CancellationToken { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("CancellationToken") + .field("is_cancelled", &self.is_cancelled()) + .finish() + } +} + +impl Clone for CancellationToken { + fn clone(&self) -> Self { + // Safety: The state inside a `CancellationToken` is always valid, since + // is reference counted + let inner = self.state(); + + // Tokens are cloned by increasing their refcount + let current_state = inner.snapshot(); + inner.increment_refcount(current_state); + + CancellationToken { inner: self.inner } + } +} + +impl Drop for CancellationToken { + fn drop(&mut self) { + let token_state_pointer = self.inner; + + // Safety: The state inside a `CancellationToken` is always valid, since + // is reference counted + let inner = unsafe { &mut *self.inner.as_ptr() }; + + let mut current_state = inner.snapshot(); + + // We need to safe the parent, since the state might be released by the + // next call + let parent = inner.parent; + + // Drop our own refcount + current_state = inner.decrement_refcount(current_state); + + // If this was the last reference, unregister from the parent + if current_state.refcount == 0 { + if let Some(mut parent) = parent { + // Safety: Since we still retain a reference on the parent, it must be valid. + let parent = unsafe { parent.as_mut() }; + parent.unregister_child(token_state_pointer, current_state); + } + } + } +} + +impl Default for CancellationToken { + fn default() -> CancellationToken { + CancellationToken::new() + } +} + +impl CancellationToken { + /// Creates a new CancellationToken in the non-cancelled state. + pub fn new() -> CancellationToken { + let state = Box::new(CancellationTokenState::new( + None, + StateSnapshot { + cancel_state: CancellationState::NotCancelled, + has_parent_ref: false, + refcount: 1, + }, + )); + + // Safety: We just created the Box. The pointer is guaranteed to be + // not null + CancellationToken { + inner: unsafe { NonNull::new_unchecked(Box::into_raw(state)) }, + } + } + + /// Returns a reference to the utilized `CancellationTokenState`. + fn state(&self) -> &CancellationTokenState { + // Safety: The state inside a `CancellationToken` is always valid, since + // is reference counted + unsafe { &*self.inner.as_ptr() } + } + + /// Creates a `CancellationToken` which will get cancelled whenever the + /// current token gets cancelled. + /// + /// If the current token is already cancelled, the child token will get + /// returned in cancelled state. + /// + /// # Examples + /// + /// ```ignore + /// use tokio::select; + /// use tokio::scope::CancellationToken; + /// + /// #[tokio::main] + /// async fn main() { + /// let token = CancellationToken::new(); + /// let child_token = token.child_token(); + /// + /// let join_handle = tokio::spawn(async move { + /// // Wait for either cancellation or a very long time + /// select! { + /// _ = child_token.cancelled() => { + /// // The token was cancelled + /// 5 + /// } + /// _ = tokio::time::delay_for(std::time::Duration::from_secs(9999)) => { + /// 99 + /// } + /// } + /// }); + /// + /// tokio::spawn(async move { + /// tokio::time::delay_for(std::time::Duration::from_millis(10)).await; + /// token.cancel(); + /// }); + /// + /// assert_eq!(5, join_handle.await.unwrap()); + /// } + /// ``` + pub fn child_token(&self) -> CancellationToken { + let inner = self.state(); + + // Increment the refcount of this token. It will be referenced by the + // child, independent of whether the child is immediately cancelled or + // not. + let _current_state = inner.increment_refcount(inner.snapshot()); + + let mut unpacked_child_state = StateSnapshot { + has_parent_ref: true, + refcount: 1, + cancel_state: CancellationState::NotCancelled, + }; + let mut child_token_state = Box::new(CancellationTokenState::new( + Some(self.inner), + unpacked_child_state, + )); + + { + let mut guard = inner.synchronized.lock().unwrap(); + if guard.is_cancelled { + // This task was already cancelled. In this case we should not + // insert the child into the list, since it would never get removed + // from the list. + (*child_token_state.synchronized.lock().unwrap()).is_cancelled = true; + unpacked_child_state.cancel_state = CancellationState::Cancelled; + // Since it's not in the list, the parent doesn't need to retain + // a reference to it. + unpacked_child_state.has_parent_ref = false; + child_token_state + .state + .store(unpacked_child_state.pack(), Ordering::SeqCst); + } else { + if let Some(mut first_child) = guard.first_child { + child_token_state.from_parent.next_peer = Some(first_child); + // Safety: We manipulate other child task inside the Mutex + // and retain a parent reference on it. The child token can't + // get invalidated while the Mutex is held. + unsafe { + first_child.as_mut().from_parent.prev_peer = + Some((&mut *child_token_state).into()) + }; + } + guard.first_child = Some((&mut *child_token_state).into()); + } + }; + + let child_token_ptr = Box::into_raw(child_token_state); + // Safety: We just created the pointer from a `Box` + CancellationToken { + inner: unsafe { NonNull::new_unchecked(child_token_ptr) }, + } + } + + /// Cancel the [`CancellationToken`] and all child tokens which had been + /// derived from it. + /// + /// This will wake up all tasks which are waiting for cancellation. + pub fn cancel(&self) { + self.state().cancel(); + } + + /// Returns `true` if the `CancellationToken` had been cancelled + pub fn is_cancelled(&self) -> bool { + self.state().is_cancelled() + } + + /// Returns a `Future` that gets fulfilled when cancellation is requested. + pub fn cancelled(&self) -> WaitForCancellationFuture<'_> { + WaitForCancellationFuture { + cancellation_token: Some(self), + wait_node: ListNode::new(WaitQueueEntry::new()), + is_registered: false, + } + } + + unsafe fn register( + &self, + wait_node: &mut ListNode<WaitQueueEntry>, + cx: &mut Context<'_>, + ) -> Poll<()> { + self.state().register(wait_node, cx) + } + + fn check_for_cancellation( + &self, + wait_node: &mut ListNode<WaitQueueEntry>, + cx: &mut Context<'_>, + ) -> Poll<()> { + self.state().check_for_cancellation(wait_node, cx) + } + + fn unregister(&self, wait_node: &mut ListNode<WaitQueueEntry>) { + self.state().unregister(wait_node) + } +} + +// ===== impl WaitForCancellationFuture ===== + +impl<'a> core::fmt::Debug for WaitForCancellationFuture<'a> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("WaitForCancellationFuture").finish() + } +} + +impl<'a> Future for WaitForCancellationFuture<'a> { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + // Safety: We do not move anything out of `WaitForCancellationFuture` + let mut_self: &mut WaitForCancellationFuture<'_> = unsafe { Pin::get_unchecked_mut(self) }; + + let cancellation_token = mut_self + .cancellation_token + .expect("polled WaitForCancellationFuture after completion"); + + let poll_res = if !mut_self.is_registered { + // Safety: The `ListNode` is pinned through the Future, + // and we will unregister it in `WaitForCancellationFuture::drop` + // before the Future is dropped and the memory reference is invalidated. + unsafe { cancellation_token.register(&mut mut_self.wait_node, cx) } + } else { + cancellation_token.check_for_cancellation(&mut mut_self.wait_node, cx) + }; + + if let Poll::Ready(()) = poll_res { + // The cancellation_token was signalled + mut_self.cancellation_token = None; + // A signalled Token means the Waker won't be enqueued anymore + mut_self.is_registered = false; + mut_self.wait_node.task = None; + } else { + // This `Future` and its stored `Waker` stay registered at the + // `CancellationToken` + mut_self.is_registered = true; + } + + poll_res + } +} + +impl<'a> Drop for WaitForCancellationFuture<'a> { + fn drop(&mut self) { + // If this WaitForCancellationFuture has been polled and it was added to the + // wait queue at the cancellation_token, it must be removed before dropping. + // Otherwise the cancellation_token would access invalid memory. + if let Some(token) = self.cancellation_token { + if self.is_registered { + token.unregister(&mut self.wait_node); + } + } + } +} + +/// Tracks how the future had interacted with the [`CancellationToken`] +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +enum PollState { + /// The task has never interacted with the [`CancellationToken`]. + New, + /// The task was added to the wait queue at the [`CancellationToken`]. + Waiting, + /// The task has been polled to completion. + Done, +} + +/// Tracks the WaitForCancellationFuture waiting state. +/// Access to this struct is synchronized through the mutex in the CancellationToken. +struct WaitQueueEntry { + /// The task handle of the waiting task + task: Option<Waker>, + // Current polling state. This state is only updated inside the Mutex of + // the CancellationToken. + state: PollState, +} + +impl WaitQueueEntry { + /// Creates a new WaitQueueEntry + fn new() -> WaitQueueEntry { + WaitQueueEntry { + task: None, + state: PollState::New, + } + } +} + +struct SynchronizedState { + waiters: LinkedList<WaitQueueEntry>, + first_child: Option<NonNull<CancellationTokenState>>, + is_cancelled: bool, +} + +impl SynchronizedState { + fn new() -> Self { + Self { + waiters: LinkedList::new(), + first_child: None, + is_cancelled: false, + } + } +} + +/// Information embedded in child tokens which is synchronized through the Mutex +/// in their parent. +struct SynchronizedThroughParent { + next_peer: Option<NonNull<CancellationTokenState>>, + prev_peer: Option<NonNull<CancellationTokenState>>, +} + +/// Possible states of a `CancellationToken` +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +enum CancellationState { + NotCancelled = 0, + Cancelling = 1, + Cancelled = 2, +} + +impl CancellationState { + fn pack(self) -> usize { + self as usize + } + + fn unpack(value: usize) -> Self { + match value { + 0 => CancellationState::NotCancelled, + 1 => CancellationState::Cancelling, + 2 => CancellationState::Cancelled, + _ => unreachable!("Invalid value"), + } + } +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +struct StateSnapshot { + /// The amount of references to this particular CancellationToken. + /// `CancellationToken` structs hold these references to a `CancellationTokenState`. + /// Also the state is referenced by the state of each child. + refcount: usize, + /// Whether the state is still referenced by it's parent and can therefore + /// not be freed. + has_parent_ref: bool, + /// Whether the token is cancelled + cancel_state: CancellationState, +} + +impl StateSnapshot { + /// Packs the snapshot into a `usize` + fn pack(self) -> usize { + self.refcount << 3 | if self.has_parent_ref { 4 } else { 0 } | self.cancel_state.pack() + } + + /// Unpacks the snapshot from a `usize` + fn unpack(value: usize) -> Self { + let refcount = value >> 3; + let has_parent_ref = value & 4 != 0; + let cancel_state = CancellationState::unpack(value & 0x03); + + StateSnapshot { + refcount, + has_parent_ref, + cancel_state, + } + } + + /// Whether this `CancellationTokenState` is still referenced by any + /// `CancellationToken`. + fn has_refs(&self) -> bool { + self.refcount != 0 || self.has_parent_ref + } +} + +/// The maximum permitted amount of references to a CancellationToken. This +/// is derived from the intent to never use more than 32bit in the `Snapshot`. +const MAX_REFS: u32 = (std::u32::MAX - 7) >> 3; + +/// Internal state of the `CancellationToken` pair above +struct CancellationTokenState { + state: AtomicUsize, + parent: Option<NonNull<CancellationTokenState>>, + from_parent: SynchronizedThroughParent, + synchronized: Mutex<SynchronizedState>, +} + +impl CancellationTokenState { + fn new( + parent: Option<NonNull<CancellationTokenState>>, + state: StateSnapshot, + ) -> CancellationTokenState { + CancellationTokenState { + parent, + from_parent: SynchronizedThroughParent { + prev_peer: None, + next_peer: None, + }, + state: AtomicUsize::new(state.pack()), + synchronized: Mutex::new(SynchronizedState::new()), + } + } + + /// Returns a snapshot of the current atomic state of the token + fn snapshot(&self) -> StateSnapshot { + StateSnapshot::unpack(self.state.load(Ordering::SeqCst)) + } + + fn atomic_update_state<F>(&self, mut current_state: StateSnapshot, func: F) -> StateSnapshot + where + F: Fn(StateSnapshot) -> StateSnapshot, + { + let mut current_packed_state = current_state.pack(); + loop { + let next_state = func(current_state); + match self.state.compare_exchange( + current_packed_state, + next_state.pack(), + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => { + return next_state; + } + Err(actual) => { + current_packed_state = actual; + current_state = StateSnapshot::unpack(actual); + } + } + } + } + + fn increment_refcount(&self, current_state: StateSnapshot) -> StateSnapshot { + self.atomic_update_state(current_state, |mut state: StateSnapshot| { + if state.refcount >= MAX_REFS as usize { + eprintln!("[ERROR] Maximum reference count for CancellationToken was exceeded"); + std::process::abort(); + } + state.refcount += 1; + state + }) + } + + fn decrement_refcount(&self, current_state: StateSnapshot) -> StateSnapshot { + let current_state = self.atomic_update_state(current_state, |mut state: StateSnapshot| { + state.refcount -= 1; + state + }); + + // Drop the State if it is not referenced anymore + if !current_state.has_refs() { + // Safety: `CancellationTokenState` is always stored in refcounted + // Boxes + let _ = unsafe { Box::from_raw(self as *const Self as *mut Self) }; + } + + current_state + } + + fn remove_parent_ref(&self, current_state: StateSnapshot) -> StateSnapshot { + let current_state = self.atomic_update_state(current_state, |mut state: StateSnapshot| { + state.has_parent_ref = false; + state + }); + + // Drop the State if it is not referenced anymore + if !current_state.has_refs() { + // Safety: `CancellationTokenState` is always stored in refcounted + // Boxes + let _ = unsafe { Box::from_raw(self as *const Self as *mut Self) }; + } + + current_state + } + + /// Unregisters a child from the parent token. + /// The child tokens state is not exactly known at this point in time. + /// If the parent token is cancelled, the child token gets removed from the + /// parents list, and might therefore already have been freed. If the parent + /// token is not cancelled, the child token is still valid. + fn unregister_child( + &mut self, + mut child_state: NonNull<CancellationTokenState>, + current_child_state: StateSnapshot, + ) { + let removed_child = { + // Remove the child toke from the parents linked list + let mut guard = self.synchronized.lock().unwrap(); + if !guard.is_cancelled { + // Safety: Since the token was not cancelled, the child must + // still be in the list and valid. + let mut child_state = unsafe { child_state.as_mut() }; + debug_assert!(child_state.snapshot().has_parent_ref); + + if guard.first_child == Some(child_state.into()) { + guard.first_child = child_state.from_parent.next_peer; + } + // Safety: If peers wouldn't be valid anymore, they would try + // to remove themselves from the list. This would require locking + // the Mutex that we currently own. + unsafe { + if let Some(mut prev_peer) = child_state.from_parent.prev_peer { + prev_peer.as_mut().from_parent.next_peer = + child_state.from_parent.next_peer; + } + if let Some(mut next_peer) = child_state.from_parent.next_peer { + next_peer.as_mut().from_parent.prev_peer = + child_state.from_parent.prev_peer; + } + } + child_state.from_parent.prev_peer = None; + child_state.from_parent.next_peer = None; + + // The child is no longer referenced by the parent, since we were able + // to remove its reference from the parents list. + true + } else { + // Do not touch the linked list anymore. If the parent is cancelled + // it will move all childs outside of the Mutex and manipulate + // the pointers there. Manipulating the pointers here too could + // lead to races. Therefore leave them just as as and let the + // parent deal with it. The parent will make sure to retain a + // reference to this state as long as it manipulates the list + // pointers. Therefore the pointers are not dangling. + false + } + }; + + if removed_child { + // If the token removed itself from the parents list, it can reset + // the the parent ref status. If it is isn't able to do so, because the + // parent removed it from the list, there is no need to do this. + // The parent ref acts as as another reference count. Therefore + // removing this reference can free the object. + // Safety: The token was in the list. This means the parent wasn't + // cancelled before, and the token must still be alive. + unsafe { child_state.as_mut().remove_parent_ref(current_child_state) }; + } + + // Decrement the refcount on the parent and free it if necessary + self.decrement_refcount(self.snapshot()); + } + + fn cancel(&self) { + // Move the state of the CancellationToken from `NotCancelled` to `Cancelling` + let mut current_state = self.snapshot(); + + let state_after_cancellation = loop { + if current_state.cancel_state != CancellationState::NotCancelled { + // Another task already initiated the cancellation + return; + } + + let mut next_state = current_state; + next_state.cancel_state = CancellationState::Cancelling; + match self.state.compare_exchange( + current_state.pack(), + next_state.pack(), + Ordering::SeqCst, + Ordering::SeqCst, + ) { + Ok(_) => break next_state, + Err(actual) => current_state = StateSnapshot::unpack(actual), + } + }; + + // This task cancelled the token + + // Take the task list out of the Token + // We do not want to cancel child token inside this lock. If one of the + // child tasks would have additional child tokens, we would recursively + // take locks. + + // Doing this action has an impact if the child token is dropped concurrently: + // It will try to deregister itself from the parent task, but can not find + // itself in the task list anymore. Therefore it needs to assume the parent + // has extracted the list and will process it. It may not modify the list. + // This is OK from a memory safety perspective, since the parent still + // retains a reference to the child task until it finished iterating over + // it. + + let mut first_child = { + let mut guard = self.synchronized.lock().unwrap(); + // Save the cancellation also inside the Mutex + // This allows child tokens which want to detach themselves to detect + // that this is no longer required since the parent cleared the list. + guard.is_cancelled = true; + + // Wakeup all waiters + // This happens inside the lock to make cancellation reliable + // If we would access waiters outside of the lock, the pointers + // may no longer be valid. + // Typically this shouldn't be an issue, since waking a task should + // only move it from the blocked into the ready state and not have + // further side effects. + + // Use a reverse iterator, so that the oldest waiter gets + // scheduled first + guard.waiters.reverse_drain(|waiter| { + // We are not allowed to move the `Waker` out of the list node. + // The `Future` relies on the fact that the old `Waker` stays there + // as long as the `Future` has not completed in order to perform + // the `will_wake()` check. + // Therefore `wake_by_ref` is used instead of `wake()` + if let Some(handle) = &mut waiter.task { + handle.wake_by_ref(); + } + // Mark the waiter to have been removed from the list. + waiter.state = PollState::Done; + }); + + guard.first_child.take() + }; + + while let Some(mut child) = first_child { + // Safety: We know this is a valid pointer since it is in our child pointer + // list. It can't have been freed in between, since we retain a a reference + // to each child. + let mut_child = unsafe { child.as_mut() }; + + // Get the next child and clean up list pointers + first_child = mut_child.from_parent.next_peer; + mut_child.from_parent.prev_peer = None; + mut_child.from_parent.next_peer = None; + + // Cancel the child task + mut_child.cancel(); + + // Drop the parent reference. This `CancellationToken` is not interested + // in interacting with the child anymore. + // This is ONLY allowed once we promised not to touch the state anymore + // after this interaction. + mut_child.remove_parent_ref(mut_child.snapshot()); + } + + // The cancellation has completed + // At this point in time tasks which registered a wait node can be sure + // that this wait node already had been dequeued from the list without + // needing to inspect the list. + self.atomic_update_state(state_after_cancellation, |mut state| { + state.cancel_state = CancellationState::Cancelled; + state + }); + } + + /// Returns `true` if the `CancellationToken` had been cancelled + fn is_cancelled(&self) -> bool { + let current_state = self.snapshot(); + current_state.cancel_state != CancellationState::NotCancelled + } + + /// Registers a waiting task at the `CancellationToken`. + /// Safety: This method is only safe as long as the waiting waiting task + /// will properly unregister the wait node before it gets moved. + unsafe fn register( + &self, + wait_node: &mut ListNode<WaitQueueEntry>, + cx: &mut Context<'_>, + ) -> Poll<()> { + debug_assert_eq!(PollState::New, wait_node.state); + let current_state = self.snapshot(); + + // Perform an optimistic cancellation check before. This is not strictly + // necessary since we also check for cancellation in the Mutex, but + // reduces the necessary work to be performed for tasks which already + // had been cancelled. + if current_state.cancel_state != CancellationState::NotCancelled { + return Poll::Ready(()); + } + + // So far the token is not cancelled. However it could be cancelld before + // we get the chance to store the `Waker`. Therfore we need to check + // for cancellation again inside the mutex. + let mut guard = self.synchronized.lock().unwrap(); + if guard.is_cancelled { + // Cancellation was signalled + wait_node.state = PollState::Done; + Poll::Ready(()) + } else { + // Added the task to the wait queue + wait_node.task = Some(cx.waker().clone()); + wait_node.state = PollState::Waiting; + guard.waiters.add_front(wait_node); + Poll::Pending + } + } + + fn check_for_cancellation( + &self, + wait_node: &mut ListNode<WaitQueueEntry>, + cx: &mut Context<'_>, + ) -> Poll<()> { + debug_assert!( + wait_node.task.is_some(), + "Method can only be called after task had been registered" + ); + + let current_state = self.snapshot(); + + if current_state.cancel_state != CancellationState::NotCancelled { + // If the cancellation had been fully completed we know that our `Waker` + // is no longer registered at the `CancellationToken`. + // Otherwise the cancel call may or may not yet have iterated + // through the waiters list and removed the wait nodes. + // If it hasn't yet, we need to remove it. Otherwise an attempt to + // reuse the `wait_nodeĀ“ might get freed due to the `WaitForCancellationFuture` + // getting dropped before the cancellation had interacted with it. + if current_state.cancel_state != CancellationState::Cancelled { + self.unregister(wait_node); + } + Poll::Ready(()) + } else { + // Check if we need to swap the `Waker`. This will make the check more + // expensive, since the `Waker` is synchronized through the Mutex. + // If we don't need to perform a `Waker` update, an atomic check for + // cancellation is sufficient. + let need_waker_update = wait_node + .task + .as_ref() + .map(|waker| waker.will_wake(cx.waker())) + .unwrap_or(true); + + if need_waker_update { + let guard = self.synchronized.lock().unwrap(); + if guard.is_cancelled { + // Cancellation was signalled. Since this cancellation signal + // is set inside the Mutex, the old waiter must already have + // been removed from the waiting list + debug_assert_eq!(PollState::Done, wait_node.state); + wait_node.task = None; + Poll::Ready(()) + } else { + // The WaitForCancellationFuture is already in the queue. + // The CancellationToken can't have been cancelled, + // since this would change the is_cancelled flag inside the mutex. + // Therefore we just have to update the Waker. A follow-up + // cancellation will always use the new waker. + wait_node.task = Some(cx.waker().clone()); + Poll::Pending + } + } else { + // Do nothing. If the token gets cancelled, this task will get + // woken again and can fetch the cancellation. + Poll::Pending + } + } + } + + fn unregister(&self, wait_node: &mut ListNode<WaitQueueEntry>) { + debug_assert!( + wait_node.task.is_some(), + "waiter can not be active without task" + ); + + let mut guard = self.synchronized.lock().unwrap(); + // WaitForCancellationFuture only needs to get removed if it has been added to + // the wait queue of the CancellationToken. + // This has happened in the PollState::Waiting case. + if let PollState::Waiting = wait_node.state { + // Safety: Due to the state, we know that the node must be part + // of the waiter list + if !unsafe { guard.wai |