diff options
author | Eliza Weisman <eliza@buoyant.io> | 2020-03-23 13:45:48 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-23 13:45:48 -0700 |
commit | acf8a7da7a64bf08d578db9a9836a8e061765314 (patch) | |
tree | a5c8fe9e0a4222eb44232613da10255e6cbd7bc8 /tokio/src/util | |
parent | 2258de51477cb36d5b69becd6058b94e4a8fc641 (diff) |
sync: new internal semaphore based on intrusive lists (#2325)
## Motivation
Many of Tokio's synchronization primitives (`RwLock`, `Mutex`,
`Semaphore`, and the bounded MPSC channel) are based on the internal
semaphore implementation, called `semaphore_ll`. This semaphore type
provides a lower-level internal API for the semaphore implementation
than the public `Semaphore` type, and supports "batch" operations, where
waiters may acquire more than one permit at a time, and batches of
permits may be released back to the semaphore.
Currently, `semaphore_ll` uses an atomic singly-linked list for the
waiter queue. The linked list implementation is specific to the
semaphore. This implementation therefore requires a heap allocation for
every waiter in the queue. These allocations are owned by the semaphore,
rather than by the task awaiting permits from the semaphore. Critically,
they are only _deallocated_ when permits are released back to the
semaphore, at which point it dequeues as many waiters from the front of
the queue as can be satisfied with the released permits. If a task
attempts to acquire permits from the semaphore and is cancelled (such as
by timing out), their waiter nodes remain in the list until they are
dequeued while releasing permits. In cases where large numbers of tasks
are cancelled while waiting for permits, this results in extremely high
memory use for the semaphore (see #2237).
## Solution
@Matthias247 has proposed that Tokio adopt the approach used in his
`futures-intrusive` crate: using an _intrusive_ linked list to store the
wakers of tasks waiting on a synchronization primitive. In an intrusive
list, each list node is stored as part of the entry that node
represents, rather than in a heap allocation that owns the entry.
Because futures must be pinned in order to be polled, the necessary
invariant of such a list --- that entries may not move while in the list
--- may be upheld by making the waiter node `!Unpin`. In this approach,
the waiter node can be stored inline in the future, rather than
requiring separate heap allocation, and cancelled futures may remove
their nodes from the list.
This branch adds a new semaphore implementation that uses the intrusive
list added to Tokio in #2210. The implementation is essentially a hybrid
of the old `semaphore_ll` and the semaphore used in `futures-intrusive`:
while a `Mutex` around the wait list is necessary, since the intrusive
list is not thread-safe, the permit state is stored outside of the mutex
and updated atomically.
The mutex is acquired only when accessing the wait list — if a task
can acquire sufficient permits without waiting, it does not need to
acquire the lock. When releasing permits, we iterate over the wait
list from the end of the queue until we run out of permits to release,
and split off all the nodes that received enough permits to wake up
into a separate list. Then, we can drain the new list and notify those
wakers *after* releasing the lock. Because the split operation only
modifies the pointers on the head node of the split-off list and the
new tail node of the old list, it is O(1) and does not require an
allocation to return a variable length number of waiters to notify.
Because of the intrusive list invariants, the API provided by the new
`batch_semaphore` is somewhat different than that of `semaphore_ll`. In
particular, the `Permit` type has been removed. This type was primarily
intended allow the reuse of a wait list node allocated on the heap.
Since the intrusive list means we can avoid heap-allocating waiters,
this is no longer necessary. Instead, acquiring permits is done by
polling an `Acquire` future returned by the `Semaphore` type. The use of
a future here ensures that the waiter node is always pinned while
waiting to acquire permits, and that a reference to the semaphore is
available to remove the waiter if the future is cancelled.
Unfortunately, the current implementation of the bounded MPSC requires a
`poll_acquire` operation, and has methods that call it while outside of
a pinned context. Therefore, I've left the old `semaphore_ll`
implementation in place to be used by the bounded MPSC, and updated the
`Mutex`, `RwLock`, and `Semaphore` APIs to use the new implementation.
Hopefully, a subsequent change can update the bounded MPSC to use the
new semaphore as well.
Fixes #2237
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Diffstat (limited to 'tokio/src/util')
-rw-r--r-- | tokio/src/util/linked_list.rs | 195 |
1 files changed, 175 insertions, 20 deletions
diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index 07c25fe9..1a488032 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -4,6 +4,7 @@ //! structure's APIs are `unsafe` as they require the caller to ensure the //! specified node is actually contained by the list. +use core::fmt; use core::mem::ManuallyDrop; use core::ptr::NonNull; @@ -11,7 +12,6 @@ use core::ptr::NonNull; /// /// Currently, the list is not emptied on drop. It is the caller's /// responsibility to ensure the list is empty before dropping it. -#[derive(Debug)] pub(crate) struct LinkedList<T: Link> { /// Linked list head head: Option<NonNull<T::Target>>, @@ -53,7 +53,6 @@ pub(crate) unsafe trait Link { } /// Previous / next pointers -#[derive(Debug)] pub(crate) struct Pointers<T> { /// The previous node in the list. null if there is no previous node. prev: Option<NonNull<T>>, @@ -81,7 +80,7 @@ impl<T: Link> LinkedList<T> { // The value should not be dropped, it is being inserted into the list let val = ManuallyDrop::new(val); let ptr = T::as_raw(&*val); - + assert_ne!(self.head, Some(ptr)); unsafe { T::pointers(ptr).as_mut().next = self.head; T::pointers(ptr).as_mut().prev = None; @@ -165,32 +164,98 @@ impl<T: Link> LinkedList<T> { } } -// ===== impl Iter ===== +cfg_sync! { + impl<T: Link> LinkedList<T> { + /// Splits this list off at `node`, returning a new list with `node` at its + /// front. + /// + /// If `node` is at the the front of this list, then this list will be empty after + /// splitting. If `node` is the last node in this list, then the returned + /// list will contain only `node`. + /// + /// # Safety + /// + /// The caller **must** ensure that `node` is currently contained by + /// `self` or not contained by any other list. + pub(crate) unsafe fn split_back(&mut self, node: NonNull<T::Target>) -> Self { + let new_tail = T::pointers(node).as_mut().prev.take().map(|prev| { + T::pointers(prev).as_mut().next = None; + prev + }); + if new_tail.is_none() { + self.head = None; + } + let tail = std::mem::replace(&mut self.tail, new_tail); + Self { + head: Some(node), + tail, + } + } -cfg_rt_threaded! { - use core::marker::PhantomData; + /// Takes all entries from this list, returning a new list. + /// + /// This list will be left empty. + pub(crate) fn take_all(&mut self) -> Self { + Self { + head: self.head.take(), + tail: self.tail.take(), + } + } + } +} - pub(crate) struct Iter<'a, T: Link> { - curr: Option<NonNull<T::Target>>, - _p: PhantomData<&'a T>, +impl<T: Link> fmt::Debug for LinkedList<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("LinkedList") + .field("head", &self.head) + .field("tail", &self.tail) + .finish() } +} - impl<T: Link> LinkedList<T> { - pub(crate) fn iter(&self) -> Iter<'_, T> { - Iter { - curr: self.head, - _p: PhantomData, - } +// ===== impl Iter ===== + +#[cfg(any(feature = "sync", feature = "rt-threaded"))] +pub(crate) struct Iter<'a, T: Link> { + curr: Option<NonNull<T::Target>>, + #[cfg(feature = "sync")] + curr_back: Option<NonNull<T::Target>>, + _p: core::marker::PhantomData<&'a T>, +} + +#[cfg(any(feature = "sync", feature = "rt-threaded"))] +impl<T: Link> LinkedList<T> { + pub(crate) fn iter(&self) -> Iter<'_, T> { + Iter { + curr: self.head, + #[cfg(feature = "sync")] + curr_back: self.tail, + _p: core::marker::PhantomData, } } +} - impl<'a, T: Link> Iterator for Iter<'a, T> { - type Item = &'a T::Target; +#[cfg(any(feature = "sync", feature = "rt-threaded"))] +impl<'a, T: Link> Iterator for Iter<'a, T> { + type Item = &'a T::Target; + + fn next(&mut self) -> Option<&'a T::Target> { + let curr = self.curr?; + // safety: the pointer references data contained by the list + self.curr = unsafe { T::pointers(curr).as_ref() }.next; + + // safety: the value is still owned by the linked list. + Some(unsafe { &*curr.as_ptr() }) + } +} + +cfg_sync! { + impl<'a, T: Link> DoubleEndedIterator for Iter<'a, T> { + fn next_back(&mut self) -> Option<&'a T::Target> { + let curr = self.curr_back?; - fn next(&mut self) -> Option<&'a T::Target> { - let curr = self.curr?; // safety: the pointer references data contained by the list - self.curr = unsafe { T::pointers(curr).as_ref() }.next; + self.curr_back = unsafe { T::pointers(curr).as_ref() }.prev; // safety: the value is still owned by the linked list. Some(unsafe { &*curr.as_ptr() }) @@ -210,6 +275,15 @@ impl<T> Pointers<T> { } } +impl<T> fmt::Debug for Pointers<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Pointers") + .field("prev", &self.prev) + .field("next", &self.next) + .finish() + } +} + #[cfg(test)] #[cfg(not(loom))] mod tests { @@ -217,6 +291,7 @@ mod tests { use std::pin::Pin; + #[derive(Debug)] struct Entry { pointers: Pointers<Entry>, val: i32, @@ -489,6 +564,86 @@ mod tests { assert!(i.next().is_none()); } + #[test] + fn split_back() { + let a = entry(1); + let b = entry(2); + let c = entry(3); + let d = entry(4); + + { + let mut list1 = LinkedList::<&Entry>::new(); + + push_all( + &mut list1, + &[a.as_ref(), b.as_ref(), c.as_ref(), d.as_ref()], + ); + let mut list2 = unsafe { list1.split_back(ptr(&a)) }; + + assert_eq!([2, 3, 4].to_vec(), collect_list(&mut list1)); + assert_eq!([1].to_vec(), collect_list(&mut list2)); + } + + { + let mut list1 = LinkedList::<&Entry>::new(); + + push_all( + &mut list1, + &[a.as_ref(), b.as_ref(), c.as_ref(), d.as_ref()], + ); + let mut list2 = unsafe { list1.split_back(ptr(&b)) }; + + assert_eq!([3, 4].to_vec(), collect_list(&mut list1)); + assert_eq!([1, 2].to_vec(), collect_list(&mut list2)); + } + + { + let mut list1 = LinkedList::<&Entry>::new(); + + push_all( + &mut list1, + &[a.as_ref(), b.as_ref(), c.as_ref(), d.as_ref()], + ); + let mut list2 = unsafe { list1.split_back(ptr(&c)) }; + + assert_eq!([4].to_vec(), collect_list(&mut list1)); + assert_eq!([1, 2, 3].to_vec(), collect_list(&mut list2)); + } + + { + let mut list1 = LinkedList::<&Entry>::new(); + + push_all( + &mut list1, + &[a.as_ref(), b.as_ref(), c.as_ref(), d.as_ref()], + ); + let mut list2 = unsafe { list1.split_back(ptr(&d)) }; + + assert_eq!(Vec::<i32>::new(), collect_list(&mut list1)); + assert_eq!([1, 2, 3, 4].to_vec(), collect_list(&mut list2)); + } + } + + #[test] + fn take_all() { + let mut list1 = LinkedList::<&Entry>::new(); + let a = entry(1); + let b = entry(2); + + list1.push_front(a.as_ref()); + list1.push_front(b.as_ref()); + + assert!(!list1.is_empty()); + + let mut list2 = list1.take_all(); + + assert!(list1.is_empty()); + assert!(!list2.is_empty()); + + assert_eq!(Vec::<i32>::new(), collect_list(&mut list1)); + assert_eq!([1, 2].to_vec(), collect_list(&mut list2)); + } + proptest::proptest! { #[test] fn fuzz_linked_list(ops: Vec<usize>) { |