summaryrefslogtreecommitdiffstats
path: root/tokio/src/io/driver/scheduled_io.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/io/driver/scheduled_io.rs')
-rw-r--r--tokio/src/io/driver/scheduled_io.rs67
1 files changed, 56 insertions, 11 deletions
diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs
index bdf21798..0c0448c3 100644
--- a/tokio/src/io/driver/scheduled_io.rs
+++ b/tokio/src/io/driver/scheduled_io.rs
@@ -32,7 +32,7 @@ cfg_io_readiness! {
#[derive(Debug, Default)]
struct Waiters {
- #[cfg(any(feature = "udp", feature = "uds"))]
+ #[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))]
/// List of all current waiters
list: WaitList,
@@ -186,33 +186,78 @@ impl ScheduledIo {
}
}
+ /// Notifies all pending waiters that have registered interest in `ready`.
+ ///
+ /// There may be many waiters to notify. Waking the pending task **must** be
+ /// done from outside of the lock otherwise there is a potential for a
+ /// deadlock.
+ ///
+ /// A stack array of wakers is created and filled with wakers to notify, the
+ /// lock is released, and the wakers are notified. Because there may be more
+ /// than 32 wakers to notify, if the stack array fills up, the lock is
+ /// released, the array is cleared, and the iteration continues.
pub(super) fn wake(&self, ready: Ready) {
+ const NUM_WAKERS: usize = 32;
+
+ let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default();
+ let mut curr = 0;
+
let mut waiters = self.waiters.lock();
// check for AsyncRead slot
if ready.is_readable() {
if let Some(waker) = waiters.reader.take() {
- waker.wake();
+ wakers[curr] = Some(waker);
+ curr += 1;
}
}
// check for AsyncWrite slot
if ready.is_writable() {
if let Some(waker) = waiters.writer.take() {
- waker.wake();
+ wakers[curr] = Some(waker);
+ curr += 1;
}
}
- #[cfg(any(feature = "udp", feature = "uds"))]
- {
- // check list of waiters
- for waiter in waiters.list.drain_filter(|w| ready.satisfies(w.interest)) {
- let waiter = unsafe { &mut *waiter.as_ptr() };
- if let Some(waker) = waiter.waker.take() {
- waiter.is_ready = true;
- waker.wake();
+ #[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))]
+ 'outer: loop {
+ let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));
+
+ while curr < NUM_WAKERS {
+ match iter.next() {
+ Some(waiter) => {
+ let waiter = unsafe { &mut *waiter.as_ptr() };
+
+ if let Some(waker) = waiter.waker.take() {
+ waiter.is_ready = true;
+ wakers[curr] = Some(waker);
+ curr += 1;
+ }
+ }
+ None => {
+ break 'outer;
+ }
}
}
+
+ drop(waiters);
+
+ for waker in wakers.iter_mut().take(curr) {
+ waker.take().unwrap().wake();
+ }
+
+ curr = 0;
+
+ // Acquire the lock again.
+ waiters = self.waiters.lock();
+ }
+
+ // Release the lock before notifying
+ drop(waiters);
+
+ for waker in wakers.iter_mut().take(curr) {
+ waker.take().unwrap().wake();
}
}