summaryrefslogtreecommitdiffstats
path: root/tokio/src/util
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-03-05 10:31:37 -0800
committerGitHub <noreply@github.com>2020-03-05 10:31:37 -0800
commita78b1c65ccfb9692ca5d3ed8ddde934f40091d83 (patch)
treec88e547d6913b204f590aea54dc03328ee3cb094 /tokio/src/util
parent5ede2e4d6b2f732e83e33f9693682dffc6c9f5b0 (diff)
rt: cleanup and simplify scheduler (scheduler v2.5) (#2273)
A refactor of the scheduler internals focusing on simplifying and reducing unsafety. There are no fundamental logic changes. * The state transitions of the core task component are refined and reduced. * `basic_scheduler` has most unsafety removed. * `local_set` has most unsafety removed. * `threaded_scheduler` limits most unsafety to its queue implementation.
Diffstat (limited to 'tokio/src/util')
-rw-r--r--tokio/src/util/linked_list.rs105
-rw-r--r--tokio/src/util/mod.rs13
-rw-r--r--tokio/src/util/try_lock.rs21
-rw-r--r--tokio/src/util/wake.rs83
4 files changed, 186 insertions, 36 deletions
diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs
index 57540c4a..07c25fe9 100644
--- a/tokio/src/util/linked_list.rs
+++ b/tokio/src/util/linked_list.rs
@@ -4,6 +4,7 @@
//! structure's APIs are `unsafe` as they require the caller to ensure the
//! specified node is actually contained by the list.
+use core::mem::ManuallyDrop;
use core::ptr::NonNull;
/// An intrusive linked list.
@@ -41,10 +42,8 @@ pub(crate) unsafe trait Link {
/// Node type
type Target;
- /// Convert the handle to a raw pointer
- ///
- /// Consumes ownership of the handle.
- fn to_raw(handle: Self::Handle) -> NonNull<Self::Target>;
+ /// Convert the handle to a raw pointer without consuming the handle
+ fn as_raw(handle: &Self::Handle) -> NonNull<Self::Target>;
/// Convert the raw pointer to a handle
unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Self::Handle;
@@ -79,7 +78,9 @@ impl<T: Link> LinkedList<T> {
/// Adds an element first in the list.
pub(crate) fn push_front(&mut self, val: T::Handle) {
- let ptr = T::to_raw(val);
+ // The value should not be dropped, it is being inserted into the list
+ let val = ManuallyDrop::new(val);
+ let ptr = T::as_raw(&*val);
unsafe {
T::pointers(ptr).as_mut().next = self.head;
@@ -133,13 +134,13 @@ impl<T: Link> LinkedList<T> {
///
/// The caller **must** ensure that `node` is currently contained by
/// `self` or not contained by any other list.
- pub(crate) unsafe fn remove(&mut self, node: NonNull<T::Target>) -> bool {
+ pub(crate) unsafe fn remove(&mut self, node: NonNull<T::Target>) -> Option<T::Handle> {
if let Some(prev) = T::pointers(node).as_ref().prev {
debug_assert_eq!(T::pointers(prev).as_ref().next, Some(node));
T::pointers(prev).as_mut().next = T::pointers(node).as_ref().next;
} else {
if self.head != Some(node) {
- return false;
+ return None;
}
self.head = T::pointers(node).as_ref().next;
@@ -151,7 +152,7 @@ impl<T: Link> LinkedList<T> {
} else {
// This might be the last item in the list
if self.tail != Some(node) {
- return false;
+ return None;
}
self.tail = T::pointers(node).as_ref().prev;
@@ -160,7 +161,40 @@ impl<T: Link> LinkedList<T> {
T::pointers(node).as_mut().next = None;
T::pointers(node).as_mut().prev = None;
- true
+ Some(T::from_raw(node))
+ }
+}
+
+// ===== impl Iter =====
+
+cfg_rt_threaded! {
+ use core::marker::PhantomData;
+
+ pub(crate) struct Iter<'a, T: Link> {
+ curr: Option<NonNull<T::Target>>,
+ _p: PhantomData<&'a T>,
+ }
+
+ impl<T: Link> LinkedList<T> {
+ pub(crate) fn iter(&self) -> Iter<'_, T> {
+ Iter {
+ curr: self.head,
+ _p: PhantomData,
+ }
+ }
+ }
+
+ impl<'a, T: Link> Iterator for Iter<'a, T> {
+ type Item = &'a T::Target;
+
+ fn next(&mut self) -> Option<&'a T::Target> {
+ let curr = self.curr?;
+ // safety: the pointer references data contained by the list
+ self.curr = unsafe { T::pointers(curr).as_ref() }.next;
+
+ // safety: the value is still owned by the linked list.
+ Some(unsafe { &*curr.as_ptr() })
+ }
}
}
@@ -192,7 +226,7 @@ mod tests {
type Handle = Pin<&'a Entry>;
type Target = Entry;
- fn to_raw(handle: Pin<&'_ Entry>) -> NonNull<Entry> {
+ fn as_raw(handle: &Pin<&'_ Entry>) -> NonNull<Entry> {
NonNull::from(handle.get_ref())
}
@@ -299,22 +333,22 @@ mod tests {
let mut list = LinkedList::new();
push_all(&mut list, &[c.as_ref(), b.as_ref(), a.as_ref()]);
- assert!(list.remove(ptr(&a)));
+ assert!(list.remove(ptr(&a)).is_some());
assert_clean!(a);
// `a` should be no longer there and can't be removed twice
- assert!(!list.remove(ptr(&a)));
+ assert!(list.remove(ptr(&a)).is_none());
assert!(!list.is_empty());
- assert!(list.remove(ptr(&b)));
+ assert!(list.remove(ptr(&b)).is_some());
assert_clean!(b);
// `b` should be no longer there and can't be removed twice
- assert!(!list.remove(ptr(&b)));
+ assert!(list.remove(ptr(&b)).is_none());
assert!(!list.is_empty());
- assert!(list.remove(ptr(&c)));
+ assert!(list.remove(ptr(&c)).is_some());
assert_clean!(c);
// `b` should be no longer there and can't be removed twice
- assert!(!list.remove(ptr(&c)));
+ assert!(list.remove(ptr(&c)).is_none());
assert!(list.is_empty());
}
@@ -324,7 +358,7 @@ mod tests {
push_all(&mut list, &[c.as_ref(), b.as_ref(), a.as_ref()]);
- assert!(list.remove(ptr(&a)));
+ assert!(list.remove(ptr(&a)).is_some());
assert_clean!(a);
assert_ptr_eq!(b, list.head);
@@ -341,7 +375,7 @@ mod tests {
push_all(&mut list, &[c.as_ref(), b.as_ref(), a.as_ref()]);
- assert!(list.remove(ptr(&b)));
+ assert!(list.remove(ptr(&b)).is_some());
assert_clean!(b);
assert_ptr_eq!(c, a.pointers.next);
@@ -358,7 +392,7 @@ mod tests {
push_all(&mut list, &[c.as_ref(), b.as_ref(), a.as_ref()]);
- assert!(list.remove(ptr(&c)));
+ assert!(list.remove(ptr(&c)).is_some());
assert_clean!(c);
assert!(b.pointers.next.is_none());
@@ -374,12 +408,12 @@ mod tests {
push_all(&mut list, &[b.as_ref(), a.as_ref()]);
- assert!(list.remove(ptr(&a)));
+ assert!(list.remove(ptr(&a)).is_some());
assert_clean!(a);
// a should be no longer there and can't be removed twice
- assert!(!list.remove(ptr(&a)));
+ assert!(list.remove(ptr(&a)).is_none());
assert_ptr_eq!(b, list.head);
assert_ptr_eq!(b, list.tail);
@@ -397,7 +431,7 @@ mod tests {
push_all(&mut list, &[b.as_ref(), a.as_ref()]);
- assert!(list.remove(ptr(&b)));
+ assert!(list.remove(ptr(&b)).is_some());
assert_clean!(b);
@@ -417,7 +451,7 @@ mod tests {
push_all(&mut list, &[a.as_ref()]);
- assert!(list.remove(ptr(&a)));
+ assert!(list.remove(ptr(&a)).is_some());
assert_clean!(a);
assert!(list.head.is_none());
@@ -433,10 +467,28 @@ mod tests {
list.push_front(b.as_ref());
list.push_front(a.as_ref());
- assert!(!list.remove(ptr(&c)));
+ assert!(list.remove(ptr(&c)).is_none());
}
}
+ #[test]
+ fn iter() {
+ let a = entry(5);
+ let b = entry(7);
+
+ let mut list = LinkedList::<&Entry>::new();
+
+ assert_eq!(0, list.iter().count());
+
+ list.push_front(a.as_ref());
+ list.push_front(b.as_ref());
+
+ let mut i = list.iter();
+ assert_eq!(7, i.next().unwrap().val);
+ assert_eq!(5, i.next().unwrap().val);
+ assert!(i.next().is_none());
+ }
+
proptest::proptest! {
#[test]
fn fuzz_linked_list(ops: Vec<usize>) {
@@ -493,10 +545,11 @@ mod tests {
}
let idx = n % reference.len();
- let v = reference.remove(idx).unwrap();
+ let expect = reference.remove(idx).unwrap();
unsafe {
- assert!(ll.remove(ptr(&entries[v as usize])));
+ let entry = ll.remove(ptr(&entries[expect as usize])).unwrap();
+ assert_eq!(expect, entry.val);
}
}
}
diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs
index 2761f725..c2f572f1 100644
--- a/tokio/src/util/mod.rs
+++ b/tokio/src/util/mod.rs
@@ -3,17 +3,18 @@ cfg_io_driver! {
pub(crate) mod slab;
}
-cfg_sync! {
- pub(crate) mod linked_list;
-}
+#[cfg(any(feature = "sync", feature = "rt-core"))]
+pub(crate) mod linked_list;
#[cfg(any(feature = "rt-threaded", feature = "macros", feature = "stream"))]
mod rand;
-cfg_rt_threaded! {
- mod pad;
- pub(crate) use pad::CachePadded;
+cfg_rt_core! {
+ mod wake;
+ pub(crate) use wake::{waker_ref, Wake};
+}
+cfg_rt_threaded! {
pub(crate) use rand::FastRand;
mod try_lock;
diff --git a/tokio/src/util/try_lock.rs b/tokio/src/util/try_lock.rs
index a42e750b..8b0edb4a 100644
--- a/tokio/src/util/try_lock.rs
+++ b/tokio/src/util/try_lock.rs
@@ -20,13 +20,26 @@ unsafe impl<T: Send> Sync for TryLock<T> {}
unsafe impl<T: Sync> Sync for LockGuard<'_, T> {}
-impl<T> TryLock<T> {
- /// Create a new `TryLock`
- pub(crate) fn new(data: T) -> TryLock<T> {
+macro_rules! new {
+ ($data:ident) => {
TryLock {
locked: AtomicBool::new(false),
- data: UnsafeCell::new(data),
+ data: UnsafeCell::new($data),
}
+ };
+}
+
+impl<T> TryLock<T> {
+ #[cfg(not(loom))]
+ /// Create a new `TryLock`
+ pub(crate) const fn new(data: T) -> TryLock<T> {
+ new!(data)
+ }
+
+ #[cfg(loom)]
+ /// Create a new `TryLock`
+ pub(crate) fn new(data: T) -> TryLock<T> {
+ new!(data)
}
/// Attempt to acquire lock
diff --git a/tokio/src/util/wake.rs b/tokio/src/util/wake.rs
new file mode 100644
index 00000000..e49f1e89
--- /dev/null
+++ b/tokio/src/util/wake.rs
@@ -0,0 +1,83 @@
+use std::marker::PhantomData;
+use std::mem::ManuallyDrop;
+use std::ops::Deref;
+use std::sync::Arc;
+use std::task::{RawWaker, RawWakerVTable, Waker};
+
+/// Simplfied waking interface based on Arcs
+pub(crate) trait Wake: Send + Sync {
+ /// Wake by value
+ fn wake(self: Arc<Self>);
+
+ /// Wake by reference
+ fn wake_by_ref(arc_self: &Arc<Self>);
+}
+
+/// A `Waker` that is only valid for a given lifetime.
+#[derive(Debug)]
+pub(crate) struct WakerRef<'a> {
+ waker: ManuallyDrop<Waker>,
+ _p: PhantomData<&'a ()>,
+}
+
+impl Deref for WakerRef<'_> {
+ type Target = Waker;
+
+ fn deref(&self) -> &Waker {
+ &self.waker
+ }
+}
+
+/// Creates a reference to a `Waker` from a reference to `Arc<impl Wake>`.
+pub(crate) fn waker_ref<W: Wake>(wake: &Arc<W>) -> WakerRef<'_> {
+ let ptr = &**wake as *const _ as *const ();
+
+ let waker = unsafe { Waker::from_raw(RawWaker::new(ptr, waker_vtable::<W>())) };
+
+ WakerRef {
+ waker: ManuallyDrop::new(waker),
+ _p: PhantomData,
+ }
+}
+
+fn waker_vtable<W: Wake>() -> &'static RawWakerVTable {
+ &RawWakerVTable::new(
+ clone_arc_raw::<W>,
+ wake_arc_raw::<W>,
+ wake_by_ref_arc_raw::<W>,
+ drop_arc_raw::<W>,
+ )
+}
+
+unsafe fn inc_ref_count<T: Wake>(data: *const ()) {
+ // Retain Arc, but don't touch refcount by wrapping in ManuallyDrop
+ let arc = ManuallyDrop::new(Arc::<T>::from_raw(data as *const T));
+
+ // Now increase refcount, but don't drop new refcount either
+ let arc_clone: ManuallyDrop<_> = arc.clone();
+
+ // Drop explicitly to avoid clippy warnings
+ drop(arc);
+ drop(arc_clone);
+}
+
+unsafe fn clone_arc_raw<T: Wake>(data: *const ()) -> RawWaker {
+ inc_ref_count::<T>(data);
+ RawWaker::new(data, waker_vtable::<T>())
+}
+
+unsafe fn wake_arc_raw<T: Wake>(data: *const ()) {
+ let arc: Arc<T> = Arc::from_raw(data as *const T);
+ Wake::wake(arc);
+}
+
+// used by `waker_ref`
+unsafe fn wake_by_ref_arc_raw<T: Wake>(data: *const ()) {
+ // Retain Arc, but don't touch refcount by wrapping in ManuallyDrop
+ let arc = ManuallyDrop::new(Arc::<T>::from_raw(data as *const T));
+ Wake::wake_by_ref(&arc);
+}
+
+unsafe fn drop_arc_raw<T: Wake>(data: *const ()) {
+ drop(Arc::<T>::from_raw(data as *const T))
+}