diff options
author | Sean McArthur <sean@seanmonstar.com> | 2020-09-23 13:02:15 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-23 13:02:15 -0700 |
commit | a0557840eb424e174bf81a0175c40f9e176a2cc2 (patch) | |
tree | 676d33bf4144f0c0aac5af9f826ecc216a1d50e2 /tokio/src/util | |
parent | f25f12d57638a2928b3f738b3b1392d8773e276e (diff) |
io: use intrusive wait list for I/O driver (#2828)
This refactors I/O registration in a few ways:
- Cleans up the cached readiness in `PollEvented`. This cache used to
be helpful when readiness was a linked list of `*mut Node`s in
`Registration`. Previous refactors have turned `Registration` into just
an `AtomicUsize` holding the current readiness, so the cache is just
extra work and complexity. Gone.
- Polling the `Registration` for readiness now gives a `ReadyEvent`,
which includes the driver tick. This event must be passed back into
`clear_readiness`, so that the readiness is only cleared from `Registration`
if the tick hasn't changed. Previously, it was possible to clear the
readiness even though another thread had *just* polled the driver and
found the socket ready again.
- Registration now also contains an `async fn readiness`, which stores
wakers in an instrusive linked list. This allows an unbounded number
of tasks to register for readiness (previously, only 1 per direction (read
and write)). By using the intrusive linked list, there is no concern of
leaking the storage of the wakers, since they are stored inside the `async fn`
and released when the future is dropped.
- Registration retains a `poll_readiness(Direction)` method, to support
`AsyncRead` and `AsyncWrite`. They aren't able to use `async fn`s, and
so there are 2 reserved slots for those methods.
- IO types where it makes sense to have multiple tasks waiting on them
now take advantage of this new `async fn readiness`, such as `UdpSocket`
and `UnixDatagram`.
Additionally, this makes the `io-driver` "feature" internal-only (no longer
documented, not part of public API), and adds a second internal-only
feature, `io-readiness`, to group together linked list part of registration
that is only used by some of the IO types.
After a bit of discussion, changing stream-based transports (like
`TcpStream`) to have `async fn read(&self)` is punted, since that
is likely too easy of a footgun to activate.
Refs: #2779, #2728
Diffstat (limited to 'tokio/src/util')
-rw-r--r-- | tokio/src/util/bit.rs | 2 | ||||
-rw-r--r-- | tokio/src/util/linked_list.rs | 54 | ||||
-rw-r--r-- | tokio/src/util/mod.rs | 2 | ||||
-rw-r--r-- | tokio/src/util/slab.rs | 2 |
4 files changed, 58 insertions, 2 deletions
diff --git a/tokio/src/util/bit.rs b/tokio/src/util/bit.rs index cf3cb196..392a0e8b 100644 --- a/tokio/src/util/bit.rs +++ b/tokio/src/util/bit.rs @@ -1,6 +1,6 @@ use std::fmt; -#[derive(Clone, Copy)] +#[derive(Clone, Copy, PartialEq)] pub(crate) struct Pack { mask: usize, shift: u32, diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index 940d6af9..d493efe4 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -106,6 +106,7 @@ impl<L: Link> LinkedList<L, L::Target> { /// Removes the last element from a list and returns it, or None if it is /// empty. + #[cfg_attr(any(feature = "udp", feature = "uds"), allow(unused))] pub(crate) fn pop_back(&mut self) -> Option<L::Handle> { unsafe { let last = self.tail?; @@ -125,6 +126,7 @@ impl<L: Link> LinkedList<L, L::Target> { } /// Returns whether the linked list doesn not contain any node + #[cfg_attr(any(feature = "udp", feature = "uds"), allow(unused))] pub(crate) fn is_empty(&self) -> bool { if self.head.is_some() { return false; @@ -180,6 +182,12 @@ impl<L: Link> fmt::Debug for LinkedList<L, L::Target> { } } +impl<L: Link> Default for LinkedList<L, L::Target> { + fn default() -> Self { + Self::new() + } +} + cfg_sync! { impl<L: Link> LinkedList<L, L::Target> { pub(crate) fn last(&self) -> Option<&L::Target> { @@ -222,6 +230,52 @@ cfg_rt_threaded! { } } +// ===== impl DrainFilter ===== + +cfg_io_readiness! { + pub(crate) struct DrainFilter<'a, T: Link, F> { + list: &'a mut LinkedList<T, T::Target>, + filter: F, + curr: Option<NonNull<T::Target>>, + } + + impl<T: Link> LinkedList<T, T::Target> { + pub(crate) fn drain_filter<F>(&mut self, filter: F) -> DrainFilter<'_, T, F> + where + F: FnMut(&mut T::Target) -> bool, + { + let curr = self.head; + DrainFilter { + curr, + filter, + list: self, + } + } + } + + impl<'a, T, F> Iterator for DrainFilter<'a, T, F> + where + T: Link, + F: FnMut(&mut T::Target) -> bool, + { + type Item = T::Handle; + + fn next(&mut self) -> Option<Self::Item> { + while let Some(curr) = self.curr { + // safety: the pointer references data contained by the list + self.curr = unsafe { T::pointers(curr).as_ref() }.next; + + // safety: the value is still owned by the linked list. + if (self.filter)(unsafe { &mut *curr.as_ptr() }) { + return unsafe { self.list.remove(curr) }; + } + } + + None + } + } +} + // ===== impl Pointers ===== impl<T> Pointers<T> { diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 278d6343..ad29c0a0 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -3,7 +3,7 @@ cfg_io_driver! { pub(crate) mod slab; } -#[cfg(any(feature = "sync", feature = "rt-core"))] +#[cfg(any(feature = "io-readiness", feature = "sync", feature = "rt-core"))] pub(crate) mod linked_list; #[cfg(any(feature = "rt-threaded", feature = "macros", feature = "stream"))] diff --git a/tokio/src/util/slab.rs b/tokio/src/util/slab.rs index cb7fd5e9..854232c2 100644 --- a/tokio/src/util/slab.rs +++ b/tokio/src/util/slab.rs @@ -141,6 +141,8 @@ unsafe impl<T: Sync> Sync for Page<T> {} unsafe impl<T: Sync> Send for Page<T> {} unsafe impl<T: Sync> Sync for CachedPage<T> {} unsafe impl<T: Sync> Send for CachedPage<T> {} +unsafe impl<T: Sync> Sync for Ref<T> {} +unsafe impl<T: Sync> Send for Ref<T> {} /// A slot in the slab. Contains slot-specific metadata. /// |