diff options
Diffstat (limited to 'tokio/src/io/driver/mod.rs')
-rw-r--r-- | tokio/src/io/driver/mod.rs | 53 |
1 files changed, 41 insertions, 12 deletions
diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index cd82b26f..a0d8e6f2 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -7,8 +7,8 @@ mod scheduled_io; pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests use crate::park::{Park, Unpark}; -use crate::util::bit; use crate::util::slab::{self, Slab}; +use crate::{loom::sync::Mutex, util::bit}; use std::fmt; use std::io; @@ -25,8 +25,10 @@ pub(crate) struct Driver { events: Option<mio::Events>, /// Primary slab handle containing the state for each resource registered - /// with this driver. - resources: Slab<ScheduledIo>, + /// with this driver. During Drop this is moved into the Inner structure, so + /// this is an Option to allow it to be vacated (until Drop this is always + /// Some) + resources: Option<Slab<ScheduledIo>>, /// The system event queue poll: mio::Poll, @@ -47,6 +49,14 @@ pub(crate) struct ReadyEvent { } pub(super) struct Inner { + /// Primary slab handle containing the state for each resource registered + /// with this driver. + /// + /// The ownership of this slab is moved into this structure during + /// `Driver::drop`, so that `Inner::drop` can notify all outstanding handles + /// without risking new ones being registered in the meantime. + resources: Mutex<Option<Slab<ScheduledIo>>>, + /// Registers I/O resources registry: mio::Registry, @@ -104,9 +114,10 @@ impl Driver { Ok(Driver { tick: 0, events: Some(mio::Events::with_capacity(1024)), - resources: slab, poll, + resources: Some(slab), inner: Arc::new(Inner { + resources: Mutex::new(None), registry, io_dispatch: allocator, waker, @@ -133,7 +144,7 @@ impl Driver { self.tick = self.tick.wrapping_add(1); if self.tick == COMPACT_INTERVAL { - self.resources.compact(); + self.resources.as_mut().unwrap().compact() } let mut events = self.events.take().expect("i/o driver event store missing"); @@ -163,7 +174,9 @@ impl Driver { fn dispatch(&mut self, token: mio::Token, ready: Ready) { let addr = slab::Address::from_usize(ADDRESS.unpack(token.0)); - let io = match self.resources.get(addr) { + let resources = self.resources.as_mut().unwrap(); + + let io = match resources.get(addr) { Some(io) => io, None => return, }; @@ -181,12 +194,22 @@ impl Driver { impl Drop for Driver { fn drop(&mut self) { - self.resources.for_each(|io| { - // If a task is waiting on the I/O resource, notify it. The task - // will then attempt to use the I/O resource and fail due to the - // driver being shutdown. - io.wake(Ready::ALL); - }) + (*self.inner.resources.lock()) = self.resources.take(); + } +} + +impl Drop for Inner { + fn drop(&mut self) { + let resources = self.resources.lock().take(); + + if let Some(mut slab) = resources { + slab.for_each(|io| { + // If a task is waiting on the I/O resource, notify it. The task + // will then attempt to use the I/O resource and fail due to the + // driver being shutdown. + io.shutdown(); + }); + } } } @@ -267,6 +290,12 @@ impl Handle { pub(super) fn inner(&self) -> Option<Arc<Inner>> { self.inner.upgrade() } + + cfg_net_unix! { + pub(super) fn is_alive(&self) -> bool { + self.inner.strong_count() > 0 + } + } } impl Unpark for Handle { |