diff options
author | bdonlan <bdonlan@gmail.com> | 2020-10-22 14:12:41 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-22 14:12:41 -0700 |
commit | c1539132110d3f8d20d22efb4b3f6a16fafd0e63 (patch) | |
tree | 88dc81a22a3732da92f2327ee5bcc93b05f1cf5a /tokio/src/io/driver | |
parent | 358e4f9f8029b6b289f8ef5a54bd7c6eae5bf969 (diff) |
io: Add AsyncFd, fix io::driver shutdown (#2903)
* io: Add AsyncFd
This adds AsyncFd, a unix-only structure to allow for read/writability states
to be monitored for arbitrary file descriptors.
Issue: #2728
* driver: fix shutdown notification unreliability
Previously, there was a race window in which an IO driver shutting down could
fail to notify ScheduledIo instances of this state; in particular, notification
of outstanding ScheduledIo registrations was driven by `Driver::drop`, but
registrations bypass `Driver` and go directly to a `Weak<Inner>`. The `Driver`
holds the `Arc<Inner>` keeping `Inner` alive, but it's possible that a new
handle could be registered (or a new readiness future created for an existing
handle) after the `Driver::drop` handler runs and prior to `Inner` being
dropped.
This change fixes this in two parts: First, notification of outstanding
ScheduledIo handles is pushed down into the drop method of `Inner` instead,
and, second, we add state to ScheduledIo to ensure that we remember that the IO
driver we're bound to has shut down after the initial shutdown notification, so
that subsequent readiness future registrations can immediately return (instead
of potentially blocking indefinitely).
Fixes: #2924
Diffstat (limited to 'tokio/src/io/driver')
-rw-r--r-- | tokio/src/io/driver/mod.rs | 53 | ||||
-rw-r--r-- | tokio/src/io/driver/scheduled_io.rs | 33 |
2 files changed, 71 insertions, 15 deletions
diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index cd82b26f..a0d8e6f2 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -7,8 +7,8 @@ mod scheduled_io; pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests use crate::park::{Park, Unpark}; -use crate::util::bit; use crate::util::slab::{self, Slab}; +use crate::{loom::sync::Mutex, util::bit}; use std::fmt; use std::io; @@ -25,8 +25,10 @@ pub(crate) struct Driver { events: Option<mio::Events>, /// Primary slab handle containing the state for each resource registered - /// with this driver. - resources: Slab<ScheduledIo>, + /// with this driver. During Drop this is moved into the Inner structure, so + /// this is an Option to allow it to be vacated (until Drop this is always + /// Some) + resources: Option<Slab<ScheduledIo>>, /// The system event queue poll: mio::Poll, @@ -47,6 +49,14 @@ pub(crate) struct ReadyEvent { } pub(super) struct Inner { + /// Primary slab handle containing the state for each resource registered + /// with this driver. + /// + /// The ownership of this slab is moved into this structure during + /// `Driver::drop`, so that `Inner::drop` can notify all outstanding handles + /// without risking new ones being registered in the meantime. + resources: Mutex<Option<Slab<ScheduledIo>>>, + /// Registers I/O resources registry: mio::Registry, @@ -104,9 +114,10 @@ impl Driver { Ok(Driver { tick: 0, events: Some(mio::Events::with_capacity(1024)), - resources: slab, poll, + resources: Some(slab), inner: Arc::new(Inner { + resources: Mutex::new(None), registry, io_dispatch: allocator, waker, @@ -133,7 +144,7 @@ impl Driver { self.tick = self.tick.wrapping_add(1); if self.tick == COMPACT_INTERVAL { - self.resources.compact(); + self.resources.as_mut().unwrap().compact() } let mut events = self.events.take().expect("i/o driver event store missing"); @@ -163,7 +174,9 @@ impl Driver { fn dispatch(&mut self, token: mio::Token, ready: Ready) { let addr = slab::Address::from_usize(ADDRESS.unpack(token.0)); - let io = match self.resources.get(addr) { + let resources = self.resources.as_mut().unwrap(); + + let io = match resources.get(addr) { Some(io) => io, None => return, }; @@ -181,12 +194,22 @@ impl Driver { impl Drop for Driver { fn drop(&mut self) { - self.resources.for_each(|io| { - // If a task is waiting on the I/O resource, notify it. The task - // will then attempt to use the I/O resource and fail due to the - // driver being shutdown. - io.wake(Ready::ALL); - }) + (*self.inner.resources.lock()) = self.resources.take(); + } +} + +impl Drop for Inner { + fn drop(&mut self) { + let resources = self.resources.lock().take(); + + if let Some(mut slab) = resources { + slab.for_each(|io| { + // If a task is waiting on the I/O resource, notify it. The task + // will then attempt to use the I/O resource and fail due to the + // driver being shutdown. + io.shutdown(); + }); + } } } @@ -267,6 +290,12 @@ impl Handle { pub(super) fn inner(&self) -> Option<Arc<Inner>> { self.inner.upgrade() } + + cfg_net_unix! { + pub(super) fn is_alive(&self) -> bool { + self.inner.strong_count() > 0 + } + } } impl Unpark for Handle { diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index b1354a05..3aefb376 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -1,4 +1,4 @@ -use super::{Direction, Ready, ReadyEvent, Tick}; +use super::{Ready, ReadyEvent, Tick}; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Mutex; use crate::util::bit; @@ -7,6 +7,8 @@ use crate::util::slab::Entry; use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; use std::task::{Context, Poll, Waker}; +use super::Direction; + cfg_io_readiness! { use crate::util::linked_list::{self, LinkedList}; @@ -41,6 +43,9 @@ struct Waiters { /// Waker used for AsyncWrite writer: Option<Waker>, + + /// True if this ScheduledIo has been killed due to IO driver shutdown + is_shutdown: bool, } cfg_io_readiness! { @@ -121,6 +126,12 @@ impl ScheduledIo { GENERATION.unpack(self.readiness.load(Acquire)) } + /// Invoked when the IO driver is shut down; forces this ScheduledIo into a + /// permanently ready state. + pub(super) fn shutdown(&self) { + self.wake0(Ready::ALL, true) + } + /// Sets the readiness on this `ScheduledIo` by invoking the given closure on /// the current value, returning the previous readiness value. /// @@ -197,6 +208,10 @@ impl ScheduledIo { /// than 32 wakers to notify, if the stack array fills up, the lock is /// released, the array is cleared, and the iteration continues. pub(super) fn wake(&self, ready: Ready) { + self.wake0(ready, false); + } + + fn wake0(&self, ready: Ready, shutdown: bool) { const NUM_WAKERS: usize = 32; let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default(); @@ -204,6 +219,8 @@ impl ScheduledIo { let mut waiters = self.waiters.lock(); + waiters.is_shutdown |= shutdown; + // check for AsyncRead slot if ready.is_readable() { if let Some(waker) = waiters.reader.take() { @@ -288,7 +305,12 @@ impl ScheduledIo { // taking the waiters lock let curr = self.readiness.load(Acquire); let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); - if ready.is_empty() { + if waiters.is_shutdown { + Poll::Ready(ReadyEvent { + tick: TICK.unpack(curr) as u8, + ready: direction.mask(), + }) + } else if ready.is_empty() { Poll::Pending } else { Poll::Ready(ReadyEvent { @@ -401,7 +423,12 @@ cfg_io_readiness! { let mut waiters = scheduled_io.waiters.lock(); let curr = scheduled_io.readiness.load(SeqCst); - let ready = Ready::from_usize(READINESS.unpack(curr)); + let mut ready = Ready::from_usize(READINESS.unpack(curr)); + + if waiters.is_shutdown { + ready = Ready::ALL; + } + let ready = ready.intersection(interest); if !ready.is_empty() { |