From c1232a65207e3b2d42d82b9144658f6293287089 Mon Sep 17 00:00:00 2001 From: Jon Gjengset Date: Wed, 12 Feb 2020 14:09:44 -0500 Subject: io: avoid unnecessary wake in registration (#2221) See discussion in #2222. This wake/notify call has been there in one form or another since the very early days of tokio. Currently though, it is not clear that it is needed; the contract for polling is that you must keep polling until you get `Pending`, so doing a wakeup when we are about to return `Ready` is premature. --- tokio/src/io/driver/mod.rs | 14 ++------ tokio/src/io/driver/scheduled_io.rs | 1 + tokio/src/util/bit.rs | 1 + tokio/tests/rt_basic.rs | 65 ++++++++++++++++++++++++++++++++++++- tokio/tests/tcp_accept.rs | 56 ++++++++++++++++++++++++++++++++ 5 files changed, 125 insertions(+), 12 deletions(-) diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index 8385448c..e707d3a5 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -277,20 +277,12 @@ impl Inner { .get(token) .unwrap_or_else(|| panic!("IO resource for token {:?} does not exist!", token)); - let readiness = sched - .get_readiness(token) - .unwrap_or_else(|| panic!("token {:?} no longer valid!", token)); - - let (waker, ready) = match dir { - Direction::Read => (&sched.reader, !mio::Ready::writable()), - Direction::Write => (&sched.writer, mio::Ready::writable()), + let waker = match dir { + Direction::Read => &sched.reader, + Direction::Write => &sched.writer, }; waker.register(w); - - if readiness & ready.as_usize() != 0 { - waker.wake(); - } } } diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index e26a3588..7f6446e3 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -56,6 +56,7 @@ impl Default for ScheduledIo { } impl ScheduledIo { + #[cfg(all(test, loom))] /// Returns the current readiness value of this `ScheduledIo`, if the /// provided `token` is still a valid access. /// diff --git a/tokio/src/util/bit.rs b/tokio/src/util/bit.rs index d18298f7..e61ac216 100644 --- a/tokio/src/util/bit.rs +++ b/tokio/src/util/bit.rs @@ -33,6 +33,7 @@ impl Pack { } /// Mask used to unpack value + #[cfg(all(test, loom))] pub(crate) const fn mask(&self) -> usize { self.mask } diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index 38a72692..b9e373b8 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -2,7 +2,7 @@ #![cfg(feature = "full")] use tokio::runtime::Runtime; -use tokio::sync::oneshot; +use tokio::sync::{mpsc, oneshot}; use tokio_test::{assert_err, assert_ok}; use std::thread; @@ -27,6 +27,69 @@ fn spawned_task_does_not_progress_without_block_on() { assert_eq!(out, "hello"); } +#[test] +fn no_extra_poll() { + use std::pin::Pin; + use std::sync::{ + atomic::{AtomicUsize, Ordering::SeqCst}, + Arc, + }; + use std::task::{Context, Poll}; + use tokio::stream::{Stream, StreamExt}; + + struct TrackPolls { + npolls: Arc, + s: S, + } + + impl Stream for TrackPolls + where + S: Stream, + { + type Item = S::Item; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // safety: we do not move s + let this = unsafe { self.get_unchecked_mut() }; + this.npolls.fetch_add(1, SeqCst); + // safety: we are pinned, and so is s + unsafe { Pin::new_unchecked(&mut this.s) }.poll_next(cx) + } + } + + let (tx, rx) = mpsc::unbounded_channel(); + let mut rx = TrackPolls { + npolls: Arc::new(AtomicUsize::new(0)), + s: rx, + }; + let npolls = Arc::clone(&rx.npolls); + + let mut rt = rt(); + + rt.spawn(async move { while let Some(_) = rx.next().await {} }); + rt.block_on(async { + tokio::task::yield_now().await; + }); + + // should have been polled exactly once: the initial poll + assert_eq!(npolls.load(SeqCst), 1); + + tx.send(()).unwrap(); + rt.block_on(async { + tokio::task::yield_now().await; + }); + + // should have been polled twice more: once to yield Some(), then once to yield Pending + assert_eq!(npolls.load(SeqCst), 1 + 2); + + drop(tx); + rt.block_on(async { + tokio::task::yield_now().await; + }); + + // should have been polled once more: to yield None + assert_eq!(npolls.load(SeqCst), 1 + 2 + 1); +} + #[test] fn acquire_mutex_in_drop() { use futures::future::pending; diff --git a/tokio/tests/tcp_accept.rs b/tokio/tests/tcp_accept.rs index eec4338f..a5a0e2d5 100644 --- a/tokio/tests/tcp_accept.rs +++ b/tokio/tests/tcp_accept.rs @@ -38,3 +38,59 @@ test_accept! { (str_port_tuple, ("127.0.0.1", 0)), (ip_port_tuple, ("127.0.0.1".parse::().unwrap(), 0)), } + +use std::pin::Pin; +use std::sync::{ + atomic::{AtomicUsize, Ordering::SeqCst}, + Arc, +}; +use std::task::{Context, Poll}; +use tokio::stream::{Stream, StreamExt}; + +struct TrackPolls { + npolls: Arc, + s: S, +} + +impl Stream for TrackPolls +where + S: Stream, +{ + type Item = S::Item; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // safety: we do not move s + let this = unsafe { self.get_unchecked_mut() }; + this.npolls.fetch_add(1, SeqCst); + // safety: we are pinned, and so is s + unsafe { Pin::new_unchecked(&mut this.s) }.poll_next(cx) + } +} + +#[tokio::test] +async fn no_extra_poll() { + let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = listener.local_addr().unwrap(); + + let (tx, rx) = oneshot::channel(); + + tokio::spawn(async move { + let mut incoming = TrackPolls { + npolls: Arc::new(AtomicUsize::new(0)), + s: listener.incoming(), + }; + assert_ok!(tx.send(Arc::clone(&incoming.npolls))); + while let Some(_) = incoming.next().await {} + }); + + let npolls = assert_ok!(rx.await); + tokio::task::yield_now().await; + + // should have been polled exactly once: the initial poll + assert_eq!(npolls.load(SeqCst), 1); + + let _ = assert_ok!(TcpStream::connect(&addr).await); + tokio::task::yield_now().await; + + // should have been polled twice more: once to yield Some(), then once to yield Pending + assert_eq!(npolls.load(SeqCst), 1 + 2); +} -- cgit v1.2.3