summaryrefslogtreecommitdiffstats
path: root/tokio/src/util
diff options
context:
space:
mode:
authorSean McArthur <sean@seanmonstar.com>2020-09-23 13:02:15 -0700
committerGitHub <noreply@github.com>2020-09-23 13:02:15 -0700
commita0557840eb424e174bf81a0175c40f9e176a2cc2 (patch)
tree676d33bf4144f0c0aac5af9f826ecc216a1d50e2 /tokio/src/util
parentf25f12d57638a2928b3f738b3b1392d8773e276e (diff)
io: use intrusive wait list for I/O driver (#2828)
This refactors I/O registration in a few ways: - Cleans up the cached readiness in `PollEvented`. This cache used to be helpful when readiness was a linked list of `*mut Node`s in `Registration`. Previous refactors have turned `Registration` into just an `AtomicUsize` holding the current readiness, so the cache is just extra work and complexity. Gone. - Polling the `Registration` for readiness now gives a `ReadyEvent`, which includes the driver tick. This event must be passed back into `clear_readiness`, so that the readiness is only cleared from `Registration` if the tick hasn't changed. Previously, it was possible to clear the readiness even though another thread had *just* polled the driver and found the socket ready again. - Registration now also contains an `async fn readiness`, which stores wakers in an instrusive linked list. This allows an unbounded number of tasks to register for readiness (previously, only 1 per direction (read and write)). By using the intrusive linked list, there is no concern of leaking the storage of the wakers, since they are stored inside the `async fn` and released when the future is dropped. - Registration retains a `poll_readiness(Direction)` method, to support `AsyncRead` and `AsyncWrite`. They aren't able to use `async fn`s, and so there are 2 reserved slots for those methods. - IO types where it makes sense to have multiple tasks waiting on them now take advantage of this new `async fn readiness`, such as `UdpSocket` and `UnixDatagram`. Additionally, this makes the `io-driver` "feature" internal-only (no longer documented, not part of public API), and adds a second internal-only feature, `io-readiness`, to group together linked list part of registration that is only used by some of the IO types. After a bit of discussion, changing stream-based transports (like `TcpStream`) to have `async fn read(&self)` is punted, since that is likely too easy of a footgun to activate. Refs: #2779, #2728
Diffstat (limited to 'tokio/src/util')
-rw-r--r--tokio/src/util/bit.rs2
-rw-r--r--tokio/src/util/linked_list.rs54
-rw-r--r--tokio/src/util/mod.rs2
-rw-r--r--tokio/src/util/slab.rs2
4 files changed, 58 insertions, 2 deletions
diff --git a/tokio/src/util/bit.rs b/tokio/src/util/bit.rs
index cf3cb196..392a0e8b 100644
--- a/tokio/src/util/bit.rs
+++ b/tokio/src/util/bit.rs
@@ -1,6 +1,6 @@
use std::fmt;
-#[derive(Clone, Copy)]
+#[derive(Clone, Copy, PartialEq)]
pub(crate) struct Pack {
mask: usize,
shift: u32,
diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs
index 940d6af9..d493efe4 100644
--- a/tokio/src/util/linked_list.rs
+++ b/tokio/src/util/linked_list.rs
@@ -106,6 +106,7 @@ impl<L: Link> LinkedList<L, L::Target> {
/// Removes the last element from a list and returns it, or None if it is
/// empty.
+ #[cfg_attr(any(feature = "udp", feature = "uds"), allow(unused))]
pub(crate) fn pop_back(&mut self) -> Option<L::Handle> {
unsafe {
let last = self.tail?;
@@ -125,6 +126,7 @@ impl<L: Link> LinkedList<L, L::Target> {
}
/// Returns whether the linked list doesn not contain any node
+ #[cfg_attr(any(feature = "udp", feature = "uds"), allow(unused))]
pub(crate) fn is_empty(&self) -> bool {
if self.head.is_some() {
return false;
@@ -180,6 +182,12 @@ impl<L: Link> fmt::Debug for LinkedList<L, L::Target> {
}
}
+impl<L: Link> Default for LinkedList<L, L::Target> {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
cfg_sync! {
impl<L: Link> LinkedList<L, L::Target> {
pub(crate) fn last(&self) -> Option<&L::Target> {
@@ -222,6 +230,52 @@ cfg_rt_threaded! {
}
}
+// ===== impl DrainFilter =====
+
+cfg_io_readiness! {
+ pub(crate) struct DrainFilter<'a, T: Link, F> {
+ list: &'a mut LinkedList<T, T::Target>,
+ filter: F,
+ curr: Option<NonNull<T::Target>>,
+ }
+
+ impl<T: Link> LinkedList<T, T::Target> {
+ pub(crate) fn drain_filter<F>(&mut self, filter: F) -> DrainFilter<'_, T, F>
+ where
+ F: FnMut(&mut T::Target) -> bool,
+ {
+ let curr = self.head;
+ DrainFilter {
+ curr,
+ filter,
+ list: self,
+ }
+ }
+ }
+
+ impl<'a, T, F> Iterator for DrainFilter<'a, T, F>
+ where
+ T: Link,
+ F: FnMut(&mut T::Target) -> bool,
+ {
+ type Item = T::Handle;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ while let Some(curr) = self.curr {
+ // safety: the pointer references data contained by the list
+ self.curr = unsafe { T::pointers(curr).as_ref() }.next;
+
+ // safety: the value is still owned by the linked list.
+ if (self.filter)(unsafe { &mut *curr.as_ptr() }) {
+ return unsafe { self.list.remove(curr) };
+ }
+ }
+
+ None
+ }
+ }
+}
+
// ===== impl Pointers =====
impl<T> Pointers<T> {
diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs
index 278d6343..ad29c0a0 100644
--- a/tokio/src/util/mod.rs
+++ b/tokio/src/util/mod.rs
@@ -3,7 +3,7 @@ cfg_io_driver! {
pub(crate) mod slab;
}
-#[cfg(any(feature = "sync", feature = "rt-core"))]
+#[cfg(any(feature = "io-readiness", feature = "sync", feature = "rt-core"))]
pub(crate) mod linked_list;
#[cfg(any(feature = "rt-threaded", feature = "macros", feature = "stream"))]
diff --git a/tokio/src/util/slab.rs b/tokio/src/util/slab.rs
index cb7fd5e9..854232c2 100644
--- a/tokio/src/util/slab.rs
+++ b/tokio/src/util/slab.rs
@@ -141,6 +141,8 @@ unsafe impl<T: Sync> Sync for Page<T> {}
unsafe impl<T: Sync> Send for Page<T> {}
unsafe impl<T: Sync> Sync for CachedPage<T> {}
unsafe impl<T: Sync> Send for CachedPage<T> {}
+unsafe impl<T: Sync> Sync for Ref<T> {}
+unsafe impl<T: Sync> Send for Ref<T> {}
/// A slot in the slab. Contains slot-specific metadata.
///