summaryrefslogtreecommitdiffstats
path: root/tokio/src
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-02-26 11:40:10 -0800
committerGitHub <noreply@github.com>2020-02-26 11:40:10 -0800
commit8b7ea0ff5cad2522d3113b77e9b6d95b507dee3b (patch)
tree82e5bb8cb6f67a07f934d03728dace58330b9604 /tokio/src
parent7207bf355e2b6418bb0d757859a5cdcdedf32530 (diff)
sync: adds Notify for basic task notification (#2210)
`Notify` provides a synchronization primitive similar to thread park / unpark, except for tasks.
Diffstat (limited to 'tokio/src')
-rw-r--r--tokio/src/loom/std/mod.rs1
-rw-r--r--tokio/src/sync/mod.rs12
-rw-r--r--tokio/src/sync/notify.rs523
-rw-r--r--tokio/src/sync/tests/loom_notify.rs90
-rw-r--r--tokio/src/sync/tests/mod.rs1
-rw-r--r--tokio/src/util/linked_list.rs478
-rw-r--r--tokio/src/util/mod.rs4
7 files changed, 1109 insertions, 0 deletions
diff --git a/tokio/src/loom/std/mod.rs b/tokio/src/loom/std/mod.rs
index d5e057e5..e4bae357 100644
--- a/tokio/src/loom/std/mod.rs
+++ b/tokio/src/loom/std/mod.rs
@@ -64,6 +64,7 @@ pub(crate) mod sync {
pub(crate) use crate::loom::std::atomic_u64::AtomicU64;
pub(crate) use crate::loom::std::atomic_usize::AtomicUsize;
+ pub(crate) use std::sync::atomic::AtomicU8;
pub(crate) use std::sync::atomic::{fence, AtomicPtr};
pub(crate) use std::sync::atomic::{spin_loop_hint, AtomicBool};
}
diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs
index 7f72bc90..5d7b29ae 100644
--- a/tokio/src/sync/mod.rs
+++ b/tokio/src/sync/mod.rs
@@ -406,9 +406,18 @@
//! * [`Mutex`][Mutex] Mutual Exclusion mechanism, which ensures that at most
//! one thread at a time is able to access some data.
//!
+//! * [`Notify`][Notify] Basic task notification. `Notify` supports notifying a
+//! receiving task without sending data. In this case, the task wakes up and
+//! resumes processing.
+//!
//! * [`RwLock`][RwLock] Provides a mutual exclusion mechanism which allows
//! multiple readers at the same time, while allowing only one writer at a
//! time. In some cases, this can be more efficient than a mutex.
+//!
+//! * [`Semaphore`][Semaphore] Limits the amount of concurrency. A semaphore
+//! holds a number of permits, which tasks may request in order to enter a
+//! critical section. Semaphores are useful for implementing limiting of
+//! bounding of any kind.
cfg_sync! {
mod barrier;
@@ -421,6 +430,9 @@ cfg_sync! {
mod mutex;
pub use mutex::{Mutex, MutexGuard};
+ mod notify;
+ pub use notify::Notify;
+
pub mod oneshot;
pub(crate) mod semaphore_ll;
diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs
new file mode 100644
index 00000000..f3c1bda1
--- /dev/null
+++ b/tokio/src/sync/notify.rs
@@ -0,0 +1,523 @@
+use crate::loom::sync::atomic::AtomicU8;
+use crate::loom::sync::Mutex;
+use crate::util::linked_list::{self, LinkedList};
+
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::atomic::Ordering::SeqCst;
+use std::task::{Context, Poll, Waker};
+
+/// Notify a single task to wake up.
+///
+/// `Notify` provides a basic mechanism to notify a single task of an event.
+/// `Notify` itself does not carry any data. Instead, it is to be used to signal
+/// another task to perform an operation.
+///
+/// `Notify` can be thought of as a [`Semaphore`] starting with 0 permits.
+/// [`notified().await`] waits for a permit to become available, and [`notify()`]
+/// sets a permit **if there currently are no available permits**.
+///
+/// The synchronization details of `Notify` are similar to
+/// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`]
+/// value contains a single permit. [`notfied().await`] waits for the permit to
+/// be made available, consumes the permit, and resumes. [`notify()`] sets the
+/// permit, waking a pending task if there is one.
+///
+/// If `notify()` is called **before** `notfied().await`, then the next call to
+/// `notified().await` will complete immediately, consuming the permit. Any
+/// subsequent calls to `notified().await` will wait for a new permit.
+///
+/// If `notify()` is called **multiple** times before `notified().await`, only a
+/// **single** permit is stored. The next call to `notified().await` will
+/// complete immediately, but the one after will wait for a new permit.
+///
+/// # Examples
+///
+/// Basic usage.
+///
+/// ```
+/// use tokio::sync::Notify;
+/// use std::sync::Arc;
+///
+/// #[tokio::main]
+/// async fn main() {
+/// let notify = Arc::new(Notify::new());
+/// let notify2 = notify.clone();
+///
+/// tokio::spawn(async move {
+/// notify2.notified().await;
+/// println!("received notification");
+/// });
+///
+/// println!("sending notification");
+/// notify.notify();
+/// }
+/// ```
+///
+/// Unbound mpsc channel.
+///
+/// ```
+/// use tokio::sync::Notify;
+///
+/// use std::collections::VecDeque;
+/// use std::sync::Mutex;
+///
+/// struct Channel<T> {
+/// values: Mutex<VecDeque<T>>,
+/// notify: Notify,
+/// }
+///
+/// impl<T> Channel<T> {
+/// pub fn send(&self, value: T) {
+/// self.values.lock().unwrap()
+/// .push_back(value);
+///
+/// // Notify the consumer a value is available
+/// self.notify.notify();
+/// }
+///
+/// pub async fn recv(&self) -> T {
+/// loop {
+/// // Drain values
+/// if let Some(value) = self.values.lock().unwrap().pop_front() {
+/// return value;
+/// }
+///
+/// // Wait for values to be available
+/// self.notify.notified().await;
+/// }
+/// }
+/// }
+/// ```
+///
+/// [park]: std::thread::park
+/// [unpark]: std::thread::Thread::unpark
+/// [`notified().await`]: Notify::notified()
+/// [`notify()`]: Notify::notify()
+/// [`Semaphore`]: crate::sync::Semaphore
+#[derive(Debug)]
+pub struct Notify {
+ state: AtomicU8,
+ waiters: Mutex<LinkedList<Waiter>>,
+}
+
+#[derive(Debug)]
+struct Waiter {
+ /// Waiting task's waker
+ waker: Option<Waker>,
+
+ /// `true` if the notification has been assigned to this waiter.
+ notified: bool,
+}
+
+/// Future returned from `notified()`
+#[derive(Debug)]
+struct Notified<'a> {
+ /// The `Notify` being received on.
+ notify: &'a Notify,
+
+ /// The current state of the receiving process.
+ state: State,
+
+ /// Entry in the waiter `LinkedList`.
+ waiter: linked_list::Entry<Waiter>,
+}
+
+#[derive(Debug)]
+enum State {
+ Init,
+ Waiting,
+ Done,
+}
+
+/// Initial "idle" state
+const EMPTY: u8 = 0;
+
+/// One or more threads are currently waiting to be notified.
+const WAITING: u8 = 1;
+
+/// Pending notification
+const NOTIFIED: u8 = 2;
+
+impl Notify {
+ /// Create a new `Notify`, initialized without a permit.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::Notify;
+ ///
+ /// let notify = Notify::new();
+ /// ```
+ pub fn new() -> Notify {
+ Notify {
+ state: AtomicU8::new(0),
+ waiters: Mutex::new(LinkedList::new()),
+ }
+ }
+
+ /// Wait for a notification.
+ ///
+ /// Each `Notify` value holds a single permit. If a permit is available from
+ /// an earlier call to [`notify()`], then `notified().await` will complete
+ /// immediately, consuming that permit. Otherwise, `notified().await` waits
+ /// for a permit to be made available by the next call to `notify()`.
+ ///
+ /// [`notify()`]: Notify::notify
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::Notify;
+ /// use std::sync::Arc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let notify = Arc::new(Notify::new());
+ /// let notify2 = notify.clone();
+ ///
+ /// tokio::spawn(async move {
+ /// notify2.notified().await;
+ /// println!("received notification");
+ /// });
+ ///
+ /// println!("sending notification");
+ /// notify.notify();
+ /// }
+ /// ```
+ pub async fn notified(&self) {
+ Notified {
+ notify: self,
+ state: State::Init,
+ waiter: linked_list::Entry::new(Waiter {
+ waker: None,
+ notified: false,
+ }),
+ }
+ .await
+ }
+
+ /// Notifies a waiting task
+ ///
+ /// If a task is currently waiting, that task is notified. Otherwise, a
+ /// permit is stored in this `Notify` value and the **next** call to
+ /// [`notified().await`] will complete immediately consuming the permit made
+ /// available by this call to `notify()`.
+ ///
+ /// At most one permit may be stored by `Notify`. Many sequential calls to
+ /// `notify` will result in a single permit being stored. The next call to
+ /// `notified().await` will complete immediately, but the one after that
+ /// will wait.
+ ///
+ /// [`notified().await`]: Notify::notified()
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::sync::Notify;
+ /// use std::sync::Arc;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let notify = Arc::new(Notify::new());
+ /// let notify2 = notify.clone();
+ ///
+ /// tokio::spawn(async move {
+ /// notify2.notified().await;
+ /// println!("received notification");
+ /// });
+ ///
+ /// println!("sending notification");
+ /// notify.notify();
+ /// }
+ /// ```
+ pub fn notify(&self) {
+ // Load the current state
+ let mut curr = self.state.load(SeqCst);
+
+ // If the state is `EMPTY`, transition to `NOTIFIED` and return.
+ while let EMPTY | NOTIFIED = curr {
+ // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A
+ // happens-before synchronization must happen between this atomic
+ // operation and a task calling `notified().await`.
+ let res = self.state.compare_exchange(curr, NOTIFIED, SeqCst, SeqCst);
+
+ match res {
+ // No waiters, no further work to do
+ Ok(_) => return,
+ Err(actual) => {
+ curr = actual;
+ }
+ }
+ }
+
+ // There are waiters, the lock must be acquired to notify.
+ let mut waiters = self.waiters.lock().unwrap();
+
+ // The state must be reloaded while the lock is held. The state may only
+ // transition out of WAITING while the lock is held.
+ curr = self.state.load(SeqCst);
+
+ if let Some(waker) = notify_locked(&mut waiters, &self.state, curr) {
+ drop(waiters);
+ waker.wake();
+ }
+ }
+}
+
+impl Default for Notify {
+ fn default() -> Notify {
+ Notify::new()
+ }
+}
+
+fn notify_locked(waiters: &mut LinkedList<Waiter>, state: &AtomicU8, curr: u8) -> Option<Waker> {
+ loop {
+ match curr {
+ EMPTY | NOTIFIED => {
+ let res = state.compare_exchange(curr, NOTIFIED, SeqCst, SeqCst);
+
+ match res {
+ Ok(_) => return None,
+ Err(actual) => {
+ assert!(actual == EMPTY || actual == NOTIFIED);
+ state.store(NOTIFIED, SeqCst);
+ return None;
+ }
+ }
+ }
+ WAITING => {
+ // At this point, it is guaranteed that the state will not
+ // concurrently change as holding the lock is required to
+ // transition **out** of `WAITING`.
+ //
+ // Get a pending waiter
+ let mut waiter = waiters.pop_back().unwrap();
+
+ assert!(!waiter.notified);
+
+ waiter.notified = true;
+ let waker = waiter.waker.take();
+
+ if waiters.is_empty() {
+ // As this the **final** waiter in the list, the state
+ // must be transitioned to `EMPTY`. As transitioning
+ // **from** `WAITING` requires the lock to be held, a
+ // `store` is sufficient.
+ state.store(EMPTY, SeqCst);
+ }
+
+ return waker;
+ }
+ _ => unreachable!(),
+ }
+ }
+}
+
+// ===== impl Notified =====
+
+impl Notified<'_> {
+ /// A custom `project` implementation is used in place of `pin-project-lite`
+ /// as a custom drop implementation is needed.
+ fn project(
+ self: Pin<&mut Self>,
+ ) -> (&Notify, &mut State, Pin<&mut linked_list::Entry<Waiter>>) {
+ unsafe {
+ // Safety: both `notify` and `state` are `Unpin`.
+
+ is_unpin::<&Notify>();
+ is_unpin::<AtomicU8>();
+
+ let me = self.get_unchecked_mut();
+ (
+ &me.notify,
+ &mut me.state,
+ Pin::new_unchecked(&mut me.waiter),
+ )
+ }
+ }
+}
+
+impl Future for Notified<'_> {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ use State::*;
+
+ let (notify, state, mut waiter) = self.project();
+
+ loop {
+ match *state {
+ Init => {
+ // Optimistically try acquiring a pending notification
+ let res = notify
+ .state
+ .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst);
+
+ if res.is_ok() {
+ // Acquired the notification
+ *state = Done;
+ return Poll::Ready(());
+ }
+
+ // Acquire the lock and attempt to transition to the waiting
+ // state.
+ let mut waiters = notify.waiters.lock().unwrap();
+
+ // Reload the state with the lock held
+ let mut curr = notify.state.load(SeqCst);
+
+ // Transition the state to WAITING.
+ loop {
+ match curr {
+ EMPTY => {
+ // Transition to WAITING
+ let res = notify
+ .state
+ .compare_exchange(EMPTY, WAITING, SeqCst, SeqCst);
+
+ if let Err(actual) = res {
+ assert_eq!(actual, NOTIFIED);
+ curr = actual;
+ } else {
+ break;
+ }
+ }
+ WAITING => break,
+ NOTIFIED => {
+ // Try consuming the notification
+ let res = notify
+ .state
+ .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst);
+
+ match res {
+ Ok(_) => {
+ // Acquired the notification
+ *state = Done;
+ return Poll::Ready(());
+ }
+ Err(actual) => {
+ assert_eq!(actual, EMPTY);
+ curr = actual;
+ }
+ }
+ }
+ _ => unreachable!(),
+ }
+ }
+
+ // Safety: called while locked.
+ unsafe {
+ (*waiter.as_mut().get()).waker = Some(cx.waker().clone());
+
+ // Insert the waiter into the linked list
+ waiters.push_front(waiter.as_mut());
+ }
+
+ *state = Waiting;
+ }
+ Waiting => {
+ // Currently in the "Waiting" state, implying the caller has
+ // a waiter stored in the waiter list (guarded by
+ // `notify.waiters`). In order to access the waker fields,
+ // we must hold the lock.
+
+ let waiters = notify.waiters.lock().unwrap();
+
+ // Safety: called while locked
+ let w = unsafe { &mut *waiter.as_mut().get() };
+
+ if w.notified {
+ // Our waker has been notified. Reset the fields and
+ // remove it from the list.
+ w.waker = None;
+ w.notified = false;
+
+ *state = Done;
+ } else {
+ // Update the waker, if necessary.
+ if !w.waker.as_ref().unwrap().will_wake(cx.waker()) {
+ w.waker = Some(cx.waker().clone());
+ }
+
+ return Poll::Pending;
+ }
+
+ // Explicit drop of the lock to indicate the scope that the
+ // lock is held. Because holding the lock is required to
+ // ensure safe access to fields not held within the lock, it
+ // is helpful to visualize the scope of the critical
+ // section.
+ drop(waiters);
+ }
+ Done => {
+ return Poll::Ready(());
+ }
+ }
+ }
+ }
+}
+
+impl Drop for Notified<'_> {
+ fn drop(&mut self) {
+ use State::*;
+
+ // Safety: The type only transitions to a "Waiting" state when pinned.
+ let (notify, state, mut waiter) = unsafe { Pin::new_unchecked(self).project() };
+
+ // This is where we ensure safety. The `Notified` value is being
+ // dropped, which means we must ensure that the waiter entry is no
+ // longer stored in the linked list.
+ if let Waiting = *state {
+ let mut notify_state = WAITING;
+ let mut waiters = notify.waiters.lock().unwrap();
+
+ // `Notify.state` may be in any of the three states (Empty, Waiting,
+ // Notified). It doesn't actually matter what the atomic is set to
+ // at this point. We hold the lock and will ensure the atomic is in
+ // the correct state once th elock is dropped.
+ //
+ // Because the atomic state is not checked, at first glance, it may
+ // seem like this routine does not handle the case where the
+ // receiver is notified but has not yet observed the notification.
+ // If this happens, no matter how many notifications happen between
+ // this receiver being notified and the receive future dropping, all
+ // we need to do is ensure that one notification is returned back to
+ // the `Notify`. This is done by calling `notify_locked` if `self`
+ // has the `notified` flag set.
+
+ // remove the entry from the list
+ //
+ // safety: the waiter is only added to `waiters` by virtue of it
+ // being the only `LinkedList` available to the type.
+ unsafe { waiters.remove(waiter.as_mut()) };
+
+ if waiters.is_empty() {
+ notify_state = EMPTY;
+ // If the state *should* be `NOTIFIED`, the call to
+ // `notify_locked` below will end up doing the
+ // `store(NOTIFIED)`. If a concurrent receiver races and
+ // observes the incorrect `EMPTY` state, it will then obtain the
+ // lock and block until `notify.state` is in the correct final
+ // state.
+ notify.state.store(EMPTY, SeqCst);
+ }
+
+ // See if the node was notified but not received. In this case, the
+ // notification must be sent to another waiter.
+ //
+ // Safety: with the entry removed from the linked list, there can be
+ // no concurrent access to the entry
+ let notified = unsafe { (*waiter.as_mut().get()).notified };
+
+ if notified {
+ if let Some(waker) = notify_locked(&mut waiters, &notify.state, notify_state) {
+ drop(waiters);
+ waker.wake();
+ }
+ }
+ }
+ }
+}
+
+fn is_unpin<T: Unpin>() {}
diff --git a/tokio/src/sync/tests/loom_notify.rs b/tokio/src/sync/tests/loom_notify.rs
new file mode 100644
index 00000000..60981d46
--- /dev/null
+++ b/tokio/src/sync/tests/loom_notify.rs
@@ -0,0 +1,90 @@
+use crate::sync::Notify;
+
+use loom::future::block_on;
+use loom::sync::Arc;
+use loom::thread;
+
+#[test]
+fn notify_one() {
+ loom::model(|| {
+ let tx = Arc::new(Notify::new());
+ let rx = tx.clone();
+
+ let th = thread::spawn(move || {
+ block_on(async {
+ rx.notified().await;
+ });
+ });
+
+ tx.notify();
+ th.join().unwrap();
+ });
+}
+
+#[test]
+fn notify_multi() {
+ loom::model(|| {
+ let notify = Arc::new(Notify::new());
+
+ let mut ths = vec![];
+
+ for _ in 0..2 {
+ let notify = notify.clone();
+
+ ths.push(thread::spawn(move || {
+ block_on(async {
+ notify.notified().await;
+ notify.notify();
+ })
+ }));
+ }
+
+ notify.notify();
+
+ for th in ths.drain(..) {
+ th.join().unwrap();
+ }
+
+ block_on(async {
+ notify.notified().await;
+ });
+ });
+}
+
+#[test]
+fn notify_drop() {
+ use crate::future::poll_fn;
+ use std::future::Future;
+ use std::task::Poll;
+
+ loom::model(|| {
+ let notify = Arc::new(Notify::new());
+ let rx1 = notify.clone();
+ let rx2 = notify.clone();
+
+ let th1 = thread::spawn(move || {
+ let mut recv = Box::pin(rx1.notified());
+
+ block_on(poll_fn(|cx| {
+ if recv.as_mut().poll(cx).is_ready() {
+ rx1.notify();
+ }
+ Poll::Ready(())
+ }));
+ });
+
+ let th2 = thread::spawn(move || {
+ block_on(async {
+ rx2.notified().await;
+ // Trigger second notification
+ rx2.notify();
+ rx2.notified().await;
+ });
+ });
+
+ notify.notify();
+
+ th1.join().unwrap();
+ th2.join().unwrap();
+ });
+}
diff --git a/tokio/src/sync/tests/mod.rs b/tokio/src/sync/tests/mod.rs
index 2ee140cb..7225ce9c 100644
--- a/tokio/src/sync/tests/mod.rs
+++ b/tokio/src/sync/tests/mod.rs
@@ -8,6 +8,7 @@ cfg_loom! {
mod loom_broadcast;
mod loom_list;
mod loom_mpsc;
+ mod loom_notify;
mod loom_oneshot;
mod loom_semaphore_ll;
}
diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs
new file mode 100644
index 00000000..2e4d8d2d
--- /dev/null
+++ b/tokio/src/util/linked_list.rs
@@ -0,0 +1,478 @@
+//! An intrusive double linked list of data
+//!
+//! The data structure supports tracking pinned nodes. Most of the data
+//! structure's APIs are `unsafe` as they require the caller to ensure the
+//! specified node is actually contained by the list.
+
+use core::cell::UnsafeCell;
+use core::marker::PhantomPinned;
+use core::pin::Pin;
+use core::ptr::NonNull;
+
+/// An intrusive linked list of nodes, where each node carries associated data
+/// of type `T`.
+#[derive(Debug)]
+pub(crate) struct LinkedList<T> {
+ head: Option<NonNull<Entry<T>>>,
+ tail: Option<NonNull<Entry<T>>>,
+}
+
+unsafe impl<T: Send> Send for LinkedList<T> {}
+unsafe impl<T: Sync> Sync for LinkedList<T> {}
+
+/// A node which carries data of type `T` and is stored in an intrusive list.
+#[derive(Debug)]
+pub(crate) struct Entry<T> {
+ /// The previous node in the list. null if there is no previous node.
+ prev: Option<NonNull<Entry<T>>>,
+
+ /// The next node in the list. null if there is no previous node.
+ next: Option<NonNull<Entry<T>>>,
+
+ /// The data which is associated to this list item
+ data: UnsafeCell<T>,
+
+ /// Prevents `Entry`s from being `Unpin`. They may never be moved, since
+ /// the list semantics require addresses to be stable.
+ _pin: PhantomPinned,
+}
+
+unsafe impl<T: Send> Send for Entry<T> {}
+unsafe impl<T: Sync> Sync for Entry<T> {}
+
+impl<T> LinkedList<T> {
+ /// Creates an empty linked list
+ pub(crate) fn new() -> Self {
+ LinkedList {
+ head: None,
+ tail: None,
+ }
+ }
+
+ /// Adds an item to the back of the linked list.
+ ///
+ /// # Safety
+ ///
+ /// The function is only safe as long as valid pointers are stored inside
+ /// the linked list.
+ pub(crate) unsafe fn push_front(&mut self, entry: Pin<&mut Entry<T>>) {
+ let mut entry: NonNull<Entry<T>> = entry.get_unchecked_mut().into();
+
+ entry.as_mut().next = self.head;
+ entry.as_mut().prev = None;
+
+ if let Some(head) = &mut self.head {
+ head.as_mut().prev = Some(entry);
+ }
+
+ self.head = Some(entry);
+
+ if self.tail.is_none() {
+ self.tail = Some(entry);
+ }
+ }
+
+ /// Removes the first element and returns it, or `None` if the list is empty.
+ ///
+ /// The function is safe as the lifetime of the entry is bound to `&mut
+ /// self`.
+ pub(crate) fn pop_back(&mut self) -> Option<Pin<&mut T>> {
+ unsafe {
+ let mut last = self.tail?;
+ self.tail = last.as_ref().prev;
+
+ if let Some(mut prev) = last.as_mut().prev {
+ prev.as_mut().next = None;
+ } else {
+ self.head = None
+ }
+
+ last.as_mut().prev = None;
+ last.as_mut().next = None;
+
+ let val = &mut *last.as_mut().data.get();
+
+ Some(Pin::new_unchecked(val))
+ }
+ }
+
+ /// Returns whether the linked list doesn not contain any node
+ pub(crate) fn is_empty(&self) -> bool {
+ if self.head.is_some() {
+ return false;
+ }
+
+ assert!(self.tail.is_none());
+ true
+ }
+
+ /// Removes the given item from the linked list.
+ ///
+ /// # Safety
+ ///
+ /// The caller **must** ensure that `entry` is currently contained by
+ /// `self`.
+ pub(crate) unsafe fn remove(&mut self, entry: Pin<&mut Entry<T>>) -> bool {
+ let mut entry = NonNull::from(entry.get_unchecked_mut());
+
+ if let Some(mut prev) = entry.as_mut().prev {
+ debug_assert_eq!(prev.as_ref().next, Some(entry));
+ prev.as_mut().next = entry.as_ref().next;
+ } else {
+ if self.head != Some(entry) {
+ return false;
+ }
+
+ self.head = entry.as_ref().next;
+ }
+
+ if let Some(mut next) = entry.as_mut().next {
+ debug_assert_eq!(next.as_ref().prev, Some(entry));
+ next.as_mut().prev = entry.as_ref().prev;
+ } else {
+ // This might be the last item in the list
+ if self.tail != Some(entry) {
+ return false;
+ }
+
+ self.tail = entry.as_ref().prev;
+ }
+
+ entry.as_mut().next = None;
+ entry.as_mut().prev = None;
+
+ true
+ }
+}
+
+impl<T> Entry<T> {
+ /// Creates a new node with the associated data
+ pub(crate) fn new(data: T) -> Entry<T> {
+ Entry {
+ prev: None,
+ next: None,
+ data: UnsafeCell::new(data),
+ _pin: PhantomPinned,
+ }
+ }
+
+ /// Get a raw pointer to the inner data
+ pub(crate) fn get(&self) -> *mut T {
+ self.data.get()
+ }
+}
+
+#[cfg(test)]
+#[cfg(not(loom))]
+mod tests {
+ use super::*;
+
+ fn collect_list<T: Copy>(list: &mut LinkedList<T>) -> Vec<T> {
+ let mut ret = vec![];
+
+ while let Some(v) = list.pop_back() {
+ ret.push(*v);
+ }
+
+ ret
+ }
+
+ unsafe fn push_all(list: &mut LinkedList<i32>, entries: &mut [Pin<&mut Entry<i32>>]) {
+ for entry in entries.iter_mut() {
+ list.push_front(entry.as_mut());
+ }
+ }
+
+ macro_rules! assert_clean {
+ ($e:ident) => {{
+ assert!($e.next.is_none());
+ assert!($e.prev.is_none());
+ }};
+ }
+
+ macro_rules! assert_ptr_eq {
+ ($a:expr, $b:expr) => {{
+ // Deal with mapping a Pin<&mut T> -> Option<NonNull<T>>
+ assert_eq!(Some($a.as_mut().get_unchecked_mut().into()), $b)
+ }};
+ }
+
+ #[test]
+ fn push_and_drain() {
+ pin! {
+ let a = Entry::new(5);
+ let b = Entry::new(7);
+ let c = Entry::new(31);
+ }
+
+ let mut list = LinkedList::new();
+ assert!(list.is_empty());
+
+ unsafe {
+ list.push_front(a);
+ assert!(!list.is_empty());
+ list.push_front(b);
+ list.push_front(c);
+ }
+
+ let items: Vec<i32> = collect_list(&mut list);
+ assert_eq!([5, 7, 31].to_vec(), items);
+
+ assert!(list.is_empty());
+ }
+
+ #[test]
+ fn push_pop_push_pop() {
+ pin! {
+ let a = Entry::new(5);
+ let b = Entry::new(7);
+ }
+
+ let mut list = LinkedList::new();
+
+ unsafe {
+ list.push_front(a);
+ }
+
+ let v = list.pop_back().unwrap();
+ assert_eq!(5, *v);
+ assert!(list.is_empty());
+
+ unsafe {
+ list.push_front(b);
+ }
+
+ let v = list.pop_back().unwrap();
+ assert_eq!(7, *v);
+
+ assert!(list.is_empty());
+ assert!(list.pop_back().is_none());
+ }
+
+ #[test]
+ fn remove_by_address() {
+ pin! {
+ let a = Entry::new(5);
+ let b = Entry::new(7);
+ let c = Entry::new(31);
+ }
+
+ unsafe {
+ // Remove first
+ let mut list = LinkedList::new();
+
+ push_all(&mut list, &mut [c.as_mut(), b.as_mut(), a.as_mut()]);
+ assert!(list.remove(a.as_mut()));
+ assert_clean!(a);
+ // `a` should be no longer there and can't be removed twice
+ assert!(!list.remove(a.as_mut()));
+ assert!(!list.is_empty());
+
+ assert!(list.remove(b.as_mut()));
+ assert_clean!(b);
+ // `b` should be no longer there and can't be removed twice
+ assert!(!list.remove(b.as_mut()));
+ assert!(!list.is_empty());
+
+ assert!(list.remove(c.as_mut()));
+ assert_clean!(c);
+ // `b` should be no longer there and can't be removed twice
+ assert!(!list.remove(c.as_mut()));
+ assert!(list.is_empty());
+ }
+
+ unsafe {
+ // Remove middle
+ let mut list = LinkedList::new();
+
+ push_all(&mut list, &mut [c.as_mut(), b.as_mut(), a.as_mut()]);
+
+ assert!(list.remove(a.as_mut()));
+ assert_clean!(a);
+
+ assert_ptr_eq!(b, list.head);
+ assert_ptr_eq!(c, b.next);
+ assert_ptr_eq!(b, c.prev);
+
+ let items = collect_list(&mut list);
+ assert_eq!([31, 7].to_vec(), items);
+ }
+
+ unsafe {
+ // Remove middle
+ let mut list = LinkedList::new();
+
+ push_all(&mut list, &mut [c.as_mut(), b.as_mut(), a.as_mut()]);
+
+ assert!(list.remove(b.as_mut()));
+ assert_clean!(b);
+
+ assert_ptr_eq!(c, a.next);
+ assert_ptr_eq!(a, c.prev);
+
+ let items = collect_list(&mut list);
+ assert_eq!([31, 5].to_vec(), items);
+ }
+
+ unsafe {
+ // Remove last
+ // Remove middle
+ let mut list = LinkedList::new();
+
+ push_all(&mut list, &mut [c.as_mut(), b.as_mut(), a.as_mut()]);
+
+ assert!(list.remove(c.as_mut()));
+ assert_clean!(c);
+
+ assert!(b.next.is_none());
+ assert_ptr_eq!(b, list.tail);
+
+ let items = collect_list(&mut list);
+ assert_eq!([7, 5].to_vec(), items);
+ }
+
+ unsafe {
+ // Remove first of two
+ let mut list = LinkedList::new();
+
+ push_all(&mut list, &mut [b.as_mut(), a.as_mut()]);
+
+ assert!(list.remove(a.as_mut()));
+
+ assert_clean!(a);
+
+ // a should be no longer there and can't be removed twice
+ assert!(!list.remove(a.as_mut()));
+
+ assert_ptr_eq!(b, list.head);
+ assert_ptr_eq!(b, list.tail);