summaryrefslogtreecommitdiffstats
path: root/tokio/src/io/driver/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/io/driver/mod.rs')
-rw-r--r--tokio/src/io/driver/mod.rs53
1 files changed, 41 insertions, 12 deletions
diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs
index cd82b26f..a0d8e6f2 100644
--- a/tokio/src/io/driver/mod.rs
+++ b/tokio/src/io/driver/mod.rs
@@ -7,8 +7,8 @@ mod scheduled_io;
pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests
use crate::park::{Park, Unpark};
-use crate::util::bit;
use crate::util::slab::{self, Slab};
+use crate::{loom::sync::Mutex, util::bit};
use std::fmt;
use std::io;
@@ -25,8 +25,10 @@ pub(crate) struct Driver {
events: Option<mio::Events>,
/// Primary slab handle containing the state for each resource registered
- /// with this driver.
- resources: Slab<ScheduledIo>,
+ /// with this driver. During Drop this is moved into the Inner structure, so
+ /// this is an Option to allow it to be vacated (until Drop this is always
+ /// Some)
+ resources: Option<Slab<ScheduledIo>>,
/// The system event queue
poll: mio::Poll,
@@ -47,6 +49,14 @@ pub(crate) struct ReadyEvent {
}
pub(super) struct Inner {
+ /// Primary slab handle containing the state for each resource registered
+ /// with this driver.
+ ///
+ /// The ownership of this slab is moved into this structure during
+ /// `Driver::drop`, so that `Inner::drop` can notify all outstanding handles
+ /// without risking new ones being registered in the meantime.
+ resources: Mutex<Option<Slab<ScheduledIo>>>,
+
/// Registers I/O resources
registry: mio::Registry,
@@ -104,9 +114,10 @@ impl Driver {
Ok(Driver {
tick: 0,
events: Some(mio::Events::with_capacity(1024)),
- resources: slab,
poll,
+ resources: Some(slab),
inner: Arc::new(Inner {
+ resources: Mutex::new(None),
registry,
io_dispatch: allocator,
waker,
@@ -133,7 +144,7 @@ impl Driver {
self.tick = self.tick.wrapping_add(1);
if self.tick == COMPACT_INTERVAL {
- self.resources.compact();
+ self.resources.as_mut().unwrap().compact()
}
let mut events = self.events.take().expect("i/o driver event store missing");
@@ -163,7 +174,9 @@ impl Driver {
fn dispatch(&mut self, token: mio::Token, ready: Ready) {
let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));
- let io = match self.resources.get(addr) {
+ let resources = self.resources.as_mut().unwrap();
+
+ let io = match resources.get(addr) {
Some(io) => io,
None => return,
};
@@ -181,12 +194,22 @@ impl Driver {
impl Drop for Driver {
fn drop(&mut self) {
- self.resources.for_each(|io| {
- // If a task is waiting on the I/O resource, notify it. The task
- // will then attempt to use the I/O resource and fail due to the
- // driver being shutdown.
- io.wake(Ready::ALL);
- })
+ (*self.inner.resources.lock()) = self.resources.take();
+ }
+}
+
+impl Drop for Inner {
+ fn drop(&mut self) {
+ let resources = self.resources.lock().take();
+
+ if let Some(mut slab) = resources {
+ slab.for_each(|io| {
+ // If a task is waiting on the I/O resource, notify it. The task
+ // will then attempt to use the I/O resource and fail due to the
+ // driver being shutdown.
+ io.shutdown();
+ });
+ }
}
}
@@ -267,6 +290,12 @@ impl Handle {
pub(super) fn inner(&self) -> Option<Arc<Inner>> {
self.inner.upgrade()
}
+
+ cfg_net_unix! {
+ pub(super) fn is_alive(&self) -> bool {
+ self.inner.strong_count() > 0
+ }
+ }
}
impl Unpark for Handle {