diff options
Diffstat (limited to 'tokio/src/io/driver/scheduled_io.rs')
-rw-r--r-- | tokio/src/io/driver/scheduled_io.rs | 67 |
1 files changed, 56 insertions, 11 deletions
diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index bdf21798..0c0448c3 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -32,7 +32,7 @@ cfg_io_readiness! { #[derive(Debug, Default)] struct Waiters { - #[cfg(any(feature = "udp", feature = "uds"))] + #[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))] /// List of all current waiters list: WaitList, @@ -186,33 +186,78 @@ impl ScheduledIo { } } + /// Notifies all pending waiters that have registered interest in `ready`. + /// + /// There may be many waiters to notify. Waking the pending task **must** be + /// done from outside of the lock otherwise there is a potential for a + /// deadlock. + /// + /// A stack array of wakers is created and filled with wakers to notify, the + /// lock is released, and the wakers are notified. Because there may be more + /// than 32 wakers to notify, if the stack array fills up, the lock is + /// released, the array is cleared, and the iteration continues. pub(super) fn wake(&self, ready: Ready) { + const NUM_WAKERS: usize = 32; + + let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default(); + let mut curr = 0; + let mut waiters = self.waiters.lock(); // check for AsyncRead slot if ready.is_readable() { if let Some(waker) = waiters.reader.take() { - waker.wake(); + wakers[curr] = Some(waker); + curr += 1; } } // check for AsyncWrite slot if ready.is_writable() { if let Some(waker) = waiters.writer.take() { - waker.wake(); + wakers[curr] = Some(waker); + curr += 1; } } - #[cfg(any(feature = "udp", feature = "uds"))] - { - // check list of waiters - for waiter in waiters.list.drain_filter(|w| ready.satisfies(w.interest)) { - let waiter = unsafe { &mut *waiter.as_ptr() }; - if let Some(waker) = waiter.waker.take() { - waiter.is_ready = true; - waker.wake(); + #[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))] + 'outer: loop { + let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest)); + + while curr < NUM_WAKERS { + match iter.next() { + Some(waiter) => { + let waiter = unsafe { &mut *waiter.as_ptr() }; + + if let Some(waker) = waiter.waker.take() { + waiter.is_ready = true; + wakers[curr] = Some(waker); + curr += 1; + } + } + None => { + break 'outer; + } } } + + drop(waiters); + + for waker in wakers.iter_mut().take(curr) { + waker.take().unwrap().wake(); + } + + curr = 0; + + // Acquire the lock again. + waiters = self.waiters.lock(); + } + + // Release the lock before notifying + drop(waiters); + + for waker in wakers.iter_mut().take(curr) { + waker.take().unwrap().wake(); } } |