summaryrefslogtreecommitdiffstats
path: root/tokio/src/sync/notify.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-02-27 13:45:28 -0800
committerGitHub <noreply@github.com>2020-02-27 13:45:28 -0800
commit06bcbe8dcf52a464d8e89866449397df1c794338 (patch)
tree4cbec9a7a4c451e3eb2fd8894e208e81ed30a962 /tokio/src/sync/notify.rs
parentbfdfb46fcd4fca0aa9bf517d3125bfb4e011449a (diff)
sync: refactor intrusive linked list (#2279)
Allow storing the intrusive linked-list pointers in an arbitrary location in the node. This is in preparation for using the linked list in the scheduler. In order to make using the intrusive linked list more flexible, a trait is introduced to abstract mapping an entry to raw pointers and the next / prev pointers. This also pushes more unsafety onto the user.
Diffstat (limited to 'tokio/src/sync/notify.rs')
-rw-r--r--tokio/src/sync/notify.rs72
1 files changed, 52 insertions, 20 deletions
diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs
index f3c1bda1..ef34ad2d 100644
--- a/tokio/src/sync/notify.rs
+++ b/tokio/src/sync/notify.rs
@@ -2,8 +2,11 @@ use crate::loom::sync::atomic::AtomicU8;
use crate::loom::sync::Mutex;
use crate::util::linked_list::{self, LinkedList};
+use std::cell::UnsafeCell;
use std::future::Future;
+use std::marker::PhantomPinned;
use std::pin::Pin;
+use std::ptr::NonNull;
use std::sync::atomic::Ordering::SeqCst;
use std::task::{Context, Poll, Waker};
@@ -103,11 +106,17 @@ pub struct Notify {
#[derive(Debug)]
struct Waiter {
+ /// Intrusive linked-list pointers
+ pointers: linked_list::Pointers<Waiter>,
+
/// Waiting task's waker
waker: Option<Waker>,
/// `true` if the notification has been assigned to this waiter.
notified: bool,
+
+ /// Should not be `Unpin`.
+ _p: PhantomPinned,
}
/// Future returned from `notified()`
@@ -120,9 +129,12 @@ struct Notified<'a> {
state: State,
/// Entry in the waiter `LinkedList`.
- waiter: linked_list::Entry<Waiter>,
+ waiter: UnsafeCell<Waiter>,
}
+unsafe impl<'a> Send for Notified<'a> {}
+unsafe impl<'a> Sync for Notified<'a> {}
+
#[derive(Debug)]
enum State {
Init,
@@ -189,9 +201,11 @@ impl Notify {
Notified {
notify: self,
state: State::Init,
- waiter: linked_list::Entry::new(Waiter {
+ waiter: UnsafeCell::new(Waiter {
+ pointers: linked_list::Pointers::new(),
waker: None,
notified: false,
+ _p: PhantomPinned,
}),
}
.await
@@ -292,7 +306,10 @@ fn notify_locked(waiters: &mut LinkedList<Waiter>, state: &AtomicU8, curr: u8) -
// transition **out** of `WAITING`.
//
// Get a pending waiter
- let mut waiter = waiters.pop_back().unwrap();
+ let waiter = waiters.pop_back().unwrap();
+
+ // Safety: `waiters` lock is still held.
+ let waiter = unsafe { &mut *waiter };
assert!(!waiter.notified);
@@ -319,9 +336,7 @@ fn notify_locked(waiters: &mut LinkedList<Waiter>, state: &AtomicU8, curr: u8) -
impl Notified<'_> {
/// A custom `project` implementation is used in place of `pin-project-lite`
/// as a custom drop implementation is needed.
- fn project(
- self: Pin<&mut Self>,
- ) -> (&Notify, &mut State, Pin<&mut linked_list::Entry<Waiter>>) {
+ fn project(self: Pin<&mut Self>) -> (&Notify, &mut State, &UnsafeCell<Waiter>) {
unsafe {
// Safety: both `notify` and `state` are `Unpin`.
@@ -329,11 +344,7 @@ impl Notified<'_> {
is_unpin::<AtomicU8>();
let me = self.get_unchecked_mut();
- (
- &me.notify,
- &mut me.state,
- Pin::new_unchecked(&mut me.waiter),
- )
+ (&me.notify, &mut me.state, &me.waiter)
}
}
}
@@ -344,7 +355,7 @@ impl Future for Notified<'_> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
use State::*;
- let (notify, state, mut waiter) = self.project();
+ let (notify, state, waiter) = self.project();
loop {
match *state {
@@ -408,12 +419,12 @@ impl Future for Notified<'_> {
// Safety: called while locked.
unsafe {
- (*waiter.as_mut().get()).waker = Some(cx.waker().clone());
-
- // Insert the waiter into the linked list
- waiters.push_front(waiter.as_mut());
+ (*waiter.get()).waker = Some(cx.waker().clone());
}
+ // Insert the waiter into the linked list
+ waiters.push_front(waiter.get());
+
*state = Waiting;
}
Waiting => {
@@ -425,7 +436,7 @@ impl Future for Notified<'_> {
let waiters = notify.waiters.lock().unwrap();
// Safety: called while locked
- let w = unsafe { &mut *waiter.as_mut().get() };
+ let w = unsafe { &mut *waiter.get() };
if w.notified {
// Our waker has been notified. Reset the fields and
@@ -463,7 +474,7 @@ impl Drop for Notified<'_> {
use State::*;
// Safety: The type only transitions to a "Waiting" state when pinned.
- let (notify, state, mut waiter) = unsafe { Pin::new_unchecked(self).project() };
+ let (notify, state, waiter) = unsafe { Pin::new_unchecked(self).project() };
// This is where we ensure safety. The `Notified` value is being
// dropped, which means we must ensure that the waiter entry is no
@@ -490,7 +501,7 @@ impl Drop for Notified<'_> {
//
// safety: the waiter is only added to `waiters` by virtue of it
// being the only `LinkedList` available to the type.
- unsafe { waiters.remove(waiter.as_mut()) };
+ unsafe { waiters.remove(NonNull::new_unchecked(waiter.get())) };
if waiters.is_empty() {
notify_state = EMPTY;
@@ -508,7 +519,7 @@ impl Drop for Notified<'_> {
//
// Safety: with the entry removed from the linked list, there can be
// no concurrent access to the entry
- let notified = unsafe { (*waiter.as_mut().get()).notified };
+ let notified = unsafe { (*waiter.get()).notified };
if notified {
if let Some(waker) = notify_locked(&mut waiters, &notify.state, notify_state) {
@@ -520,4 +531,25 @@ impl Drop for Notified<'_> {
}
}
+/// # Safety
+///
+/// `Waiter` is forced to be !Unpin.
+unsafe impl linked_list::Link for Waiter {
+ type Handle = *mut Waiter;
+ type Target = Waiter;
+
+ fn to_raw(handle: *mut Waiter) -> NonNull<Waiter> {
+ debug_assert!(!handle.is_null());
+ unsafe { NonNull::new_unchecked(handle) }
+ }
+
+ unsafe fn from_raw(ptr: NonNull<Waiter>) -> *mut Waiter {
+ ptr.as_ptr()
+ }
+
+ unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
+ NonNull::from(&mut target.as_mut().pointers)
+ }
+}
+
fn is_unpin<T: Unpin>() {}