summaryrefslogtreecommitdiffstats
path: root/tokio/src/io/driver/scheduled_io.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/io/driver/scheduled_io.rs')
-rw-r--r--tokio/src/io/driver/scheduled_io.rs33
1 files changed, 30 insertions, 3 deletions
diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs
index b1354a05..3aefb376 100644
--- a/tokio/src/io/driver/scheduled_io.rs
+++ b/tokio/src/io/driver/scheduled_io.rs
@@ -1,4 +1,4 @@
-use super::{Direction, Ready, ReadyEvent, Tick};
+use super::{Ready, ReadyEvent, Tick};
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::util::bit;
@@ -7,6 +7,8 @@ use crate::util::slab::Entry;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::task::{Context, Poll, Waker};
+use super::Direction;
+
cfg_io_readiness! {
use crate::util::linked_list::{self, LinkedList};
@@ -41,6 +43,9 @@ struct Waiters {
/// Waker used for AsyncWrite
writer: Option<Waker>,
+
+ /// True if this ScheduledIo has been killed due to IO driver shutdown
+ is_shutdown: bool,
}
cfg_io_readiness! {
@@ -121,6 +126,12 @@ impl ScheduledIo {
GENERATION.unpack(self.readiness.load(Acquire))
}
+ /// Invoked when the IO driver is shut down; forces this ScheduledIo into a
+ /// permanently ready state.
+ pub(super) fn shutdown(&self) {
+ self.wake0(Ready::ALL, true)
+ }
+
/// Sets the readiness on this `ScheduledIo` by invoking the given closure on
/// the current value, returning the previous readiness value.
///
@@ -197,6 +208,10 @@ impl ScheduledIo {
/// than 32 wakers to notify, if the stack array fills up, the lock is
/// released, the array is cleared, and the iteration continues.
pub(super) fn wake(&self, ready: Ready) {
+ self.wake0(ready, false);
+ }
+
+ fn wake0(&self, ready: Ready, shutdown: bool) {
const NUM_WAKERS: usize = 32;
let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default();
@@ -204,6 +219,8 @@ impl ScheduledIo {
let mut waiters = self.waiters.lock();
+ waiters.is_shutdown |= shutdown;
+
// check for AsyncRead slot
if ready.is_readable() {
if let Some(waker) = waiters.reader.take() {
@@ -288,7 +305,12 @@ impl ScheduledIo {
// taking the waiters lock
let curr = self.readiness.load(Acquire);
let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
- if ready.is_empty() {
+ if waiters.is_shutdown {
+ Poll::Ready(ReadyEvent {
+ tick: TICK.unpack(curr) as u8,
+ ready: direction.mask(),
+ })
+ } else if ready.is_empty() {
Poll::Pending
} else {
Poll::Ready(ReadyEvent {
@@ -401,7 +423,12 @@ cfg_io_readiness! {
let mut waiters = scheduled_io.waiters.lock();
let curr = scheduled_io.readiness.load(SeqCst);
- let ready = Ready::from_usize(READINESS.unpack(curr));
+ let mut ready = Ready::from_usize(READINESS.unpack(curr));
+
+ if waiters.is_shutdown {
+ ready = Ready::ALL;
+ }
+
let ready = ready.intersection(interest);
if !ready.is_empty() {