summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorJon Gjengset <jon@thesquareplanet.com>2020-02-12 14:09:44 -0500
committerGitHub <noreply@github.com>2020-02-12 11:09:44 -0800
commitc1232a65207e3b2d42d82b9144658f6293287089 (patch)
tree883579843c03175e28526bfaba09d429d0b8a098 /tokio
parent5e75b0446d771f527d65ecc7ba34e2276eb2bf21 (diff)
io: avoid unnecessary wake in registration (#2221)
See discussion in #2222. This wake/notify call has been there in one form or another since the very early days of tokio. Currently though, it is not clear that it is needed; the contract for polling is that you must keep polling until you get `Pending`, so doing a wakeup when we are about to return `Ready` is premature.
Diffstat (limited to 'tokio')
-rw-r--r--tokio/src/io/driver/mod.rs14
-rw-r--r--tokio/src/io/driver/scheduled_io.rs1
-rw-r--r--tokio/src/util/bit.rs1
-rw-r--r--tokio/tests/rt_basic.rs65
-rw-r--r--tokio/tests/tcp_accept.rs56
5 files changed, 125 insertions, 12 deletions
diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs
index 8385448c..e707d3a5 100644
--- a/tokio/src/io/driver/mod.rs
+++ b/tokio/src/io/driver/mod.rs
@@ -277,20 +277,12 @@ impl Inner {
.get(token)
.unwrap_or_else(|| panic!("IO resource for token {:?} does not exist!", token));
- let readiness = sched
- .get_readiness(token)
- .unwrap_or_else(|| panic!("token {:?} no longer valid!", token));
-
- let (waker, ready) = match dir {
- Direction::Read => (&sched.reader, !mio::Ready::writable()),
- Direction::Write => (&sched.writer, mio::Ready::writable()),
+ let waker = match dir {
+ Direction::Read => &sched.reader,
+ Direction::Write => &sched.writer,
};
waker.register(w);
-
- if readiness & ready.as_usize() != 0 {
- waker.wake();
- }
}
}
diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs
index e26a3588..7f6446e3 100644
--- a/tokio/src/io/driver/scheduled_io.rs
+++ b/tokio/src/io/driver/scheduled_io.rs
@@ -56,6 +56,7 @@ impl Default for ScheduledIo {
}
impl ScheduledIo {
+ #[cfg(all(test, loom))]
/// Returns the current readiness value of this `ScheduledIo`, if the
/// provided `token` is still a valid access.
///
diff --git a/tokio/src/util/bit.rs b/tokio/src/util/bit.rs
index d18298f7..e61ac216 100644
--- a/tokio/src/util/bit.rs
+++ b/tokio/src/util/bit.rs
@@ -33,6 +33,7 @@ impl Pack {
}
/// Mask used to unpack value
+ #[cfg(all(test, loom))]
pub(crate) const fn mask(&self) -> usize {
self.mask
}
diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs
index 38a72692..b9e373b8 100644
--- a/tokio/tests/rt_basic.rs
+++ b/tokio/tests/rt_basic.rs
@@ -2,7 +2,7 @@
#![cfg(feature = "full")]
use tokio::runtime::Runtime;
-use tokio::sync::oneshot;
+use tokio::sync::{mpsc, oneshot};
use tokio_test::{assert_err, assert_ok};
use std::thread;
@@ -28,6 +28,69 @@ fn spawned_task_does_not_progress_without_block_on() {
}
#[test]
+fn no_extra_poll() {
+ use std::pin::Pin;
+ use std::sync::{
+ atomic::{AtomicUsize, Ordering::SeqCst},
+ Arc,
+ };
+ use std::task::{Context, Poll};
+ use tokio::stream::{Stream, StreamExt};
+
+ struct TrackPolls<S> {
+ npolls: Arc<AtomicUsize>,
+ s: S,
+ }
+
+ impl<S> Stream for TrackPolls<S>
+ where
+ S: Stream,
+ {
+ type Item = S::Item;
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ // safety: we do not move s
+ let this = unsafe { self.get_unchecked_mut() };
+ this.npolls.fetch_add(1, SeqCst);
+ // safety: we are pinned, and so is s
+ unsafe { Pin::new_unchecked(&mut this.s) }.poll_next(cx)
+ }
+ }
+
+ let (tx, rx) = mpsc::unbounded_channel();
+ let mut rx = TrackPolls {
+ npolls: Arc::new(AtomicUsize::new(0)),
+ s: rx,
+ };
+ let npolls = Arc::clone(&rx.npolls);
+
+ let mut rt = rt();
+
+ rt.spawn(async move { while let Some(_) = rx.next().await {} });
+ rt.block_on(async {
+ tokio::task::yield_now().await;
+ });
+
+ // should have been polled exactly once: the initial poll
+ assert_eq!(npolls.load(SeqCst), 1);
+
+ tx.send(()).unwrap();
+ rt.block_on(async {
+ tokio::task::yield_now().await;
+ });
+
+ // should have been polled twice more: once to yield Some(), then once to yield Pending
+ assert_eq!(npolls.load(SeqCst), 1 + 2);
+
+ drop(tx);
+ rt.block_on(async {
+ tokio::task::yield_now().await;
+ });
+
+ // should have been polled once more: to yield None
+ assert_eq!(npolls.load(SeqCst), 1 + 2 + 1);
+}
+
+#[test]
fn acquire_mutex_in_drop() {
use futures::future::pending;
use tokio::task;
diff --git a/tokio/tests/tcp_accept.rs b/tokio/tests/tcp_accept.rs
index eec4338f..a5a0e2d5 100644
--- a/tokio/tests/tcp_accept.rs
+++ b/tokio/tests/tcp_accept.rs
@@ -38,3 +38,59 @@ test_accept! {
(str_port_tuple, ("127.0.0.1", 0)),
(ip_port_tuple, ("127.0.0.1".parse::<IpAddr>().unwrap(), 0)),
}
+
+use std::pin::Pin;
+use std::sync::{
+ atomic::{AtomicUsize, Ordering::SeqCst},
+ Arc,
+};
+use std::task::{Context, Poll};
+use tokio::stream::{Stream, StreamExt};
+
+struct TrackPolls<S> {
+ npolls: Arc<AtomicUsize>,
+ s: S,
+}
+
+impl<S> Stream for TrackPolls<S>
+where
+ S: Stream,
+{
+ type Item = S::Item;
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ // safety: we do not move s
+ let this = unsafe { self.get_unchecked_mut() };
+ this.npolls.fetch_add(1, SeqCst);
+ // safety: we are pinned, and so is s
+ unsafe { Pin::new_unchecked(&mut this.s) }.poll_next(cx)
+ }
+}
+
+#[tokio::test]
+async fn no_extra_poll() {
+ let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
+ let addr = listener.local_addr().unwrap();
+
+ let (tx, rx) = oneshot::channel();
+
+ tokio::spawn(async move {
+ let mut incoming = TrackPolls {
+ npolls: Arc::new(AtomicUsize::new(0)),
+ s: listener.incoming(),
+ };
+ assert_ok!(tx.send(Arc::clone(&incoming.npolls)));
+ while let Some(_) = incoming.next().await {}
+ });
+
+ let npolls = assert_ok!(rx.await);
+ tokio::task::yield_now().await;
+
+ // should have been polled exactly once: the initial poll
+ assert_eq!(npolls.load(SeqCst), 1);
+
+ let _ = assert_ok!(TcpStream::connect(&addr).await);
+ tokio::task::yield_now().await;
+
+ // should have been polled twice more: once to yield Some(), then once to yield Pending
+ assert_eq!(npolls.load(SeqCst), 1 + 2);
+}