diff options
Diffstat (limited to 'tokio/src/io/driver/scheduled_io.rs')
-rw-r--r-- | tokio/src/io/driver/scheduled_io.rs | 33 |
1 files changed, 30 insertions, 3 deletions
diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index b1354a05..3aefb376 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -1,4 +1,4 @@ -use super::{Direction, Ready, ReadyEvent, Tick}; +use super::{Ready, ReadyEvent, Tick}; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Mutex; use crate::util::bit; @@ -7,6 +7,8 @@ use crate::util::slab::Entry; use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; use std::task::{Context, Poll, Waker}; +use super::Direction; + cfg_io_readiness! { use crate::util::linked_list::{self, LinkedList}; @@ -41,6 +43,9 @@ struct Waiters { /// Waker used for AsyncWrite writer: Option<Waker>, + + /// True if this ScheduledIo has been killed due to IO driver shutdown + is_shutdown: bool, } cfg_io_readiness! { @@ -121,6 +126,12 @@ impl ScheduledIo { GENERATION.unpack(self.readiness.load(Acquire)) } + /// Invoked when the IO driver is shut down; forces this ScheduledIo into a + /// permanently ready state. + pub(super) fn shutdown(&self) { + self.wake0(Ready::ALL, true) + } + /// Sets the readiness on this `ScheduledIo` by invoking the given closure on /// the current value, returning the previous readiness value. /// @@ -197,6 +208,10 @@ impl ScheduledIo { /// than 32 wakers to notify, if the stack array fills up, the lock is /// released, the array is cleared, and the iteration continues. pub(super) fn wake(&self, ready: Ready) { + self.wake0(ready, false); + } + + fn wake0(&self, ready: Ready, shutdown: bool) { const NUM_WAKERS: usize = 32; let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default(); @@ -204,6 +219,8 @@ impl ScheduledIo { let mut waiters = self.waiters.lock(); + waiters.is_shutdown |= shutdown; + // check for AsyncRead slot if ready.is_readable() { if let Some(waker) = waiters.reader.take() { @@ -288,7 +305,12 @@ impl ScheduledIo { // taking the waiters lock let curr = self.readiness.load(Acquire); let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); - if ready.is_empty() { + if waiters.is_shutdown { + Poll::Ready(ReadyEvent { + tick: TICK.unpack(curr) as u8, + ready: direction.mask(), + }) + } else if ready.is_empty() { Poll::Pending } else { Poll::Ready(ReadyEvent { @@ -401,7 +423,12 @@ cfg_io_readiness! { let mut waiters = scheduled_io.waiters.lock(); let curr = scheduled_io.readiness.load(SeqCst); - let ready = Ready::from_usize(READINESS.unpack(curr)); + let mut ready = Ready::from_usize(READINESS.unpack(curr)); + + if waiters.is_shutdown { + ready = Ready::ALL; + } + let ready = ready.intersection(interest); if !ready.is_empty() { |