summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSean McArthur <sean@seanmonstar.com>2020-09-25 16:34:40 -0700
committerGitHub <noreply@github.com>2020-09-25 16:34:40 -0700
commitdfdfd61372cca02087c0e2773dc978d03235bc51 (patch)
treec3401db75087a6df56b9eba06c1dbdb8cee6caa0
parent55d932a21fd4c5fa298ca3cfdcb1388dbbf43dd0 (diff)
Fix readiness future eagerly consuming entire socket readiness (#2887)
In the `readiness` future, before inserting a waiter into the list, the current socket readiness is eagerly checked. However, it would return as a `ReadyEvent` the entire socket readiness, instead of just the interest desired from `readiness(interest)`. This would result in the later call to `clear_readiness(event)` removing all of it. Closes #2886
-rw-r--r--tokio/src/io/driver/scheduled_io.rs2
-rw-r--r--tokio/tests/udp.rs32
2 files changed, 33 insertions, 1 deletions
diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs
index 88daeb2d..f63fd7ab 100644
--- a/tokio/src/io/driver/scheduled_io.rs
+++ b/tokio/src/io/driver/scheduled_io.rs
@@ -371,7 +371,7 @@ cfg_io_readiness! {
// Currently ready!
let tick = TICK.unpack(curr) as u8;
*state = State::Done;
- return Poll::Ready(ReadyEvent { readiness, tick });
+ return Poll::Ready(ReadyEvent { readiness: interest, tick });
}
// Wasn't ready, take the lock (and check again while locked).
diff --git a/tokio/tests/udp.rs b/tokio/tests/udp.rs
index 9da67267..0bea83aa 100644
--- a/tokio/tests/udp.rs
+++ b/tokio/tests/udp.rs
@@ -56,6 +56,38 @@ async fn split() -> std::io::Result<()> {
Ok(())
}
+#[tokio::test]
+async fn split_chan() -> std::io::Result<()> {
+ // setup UdpSocket that will echo all sent items
+ let socket = UdpSocket::bind("127.0.0.1:0").await?;
+ let addr = socket.local_addr().unwrap();
+ let s = Arc::new(socket);
+ let r = s.clone();
+
+ let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, std::net::SocketAddr)>(1_000);
+ tokio::spawn(async move {
+ while let Some((bytes, addr)) = rx.recv().await {
+ s.send_to(&bytes, &addr).await.unwrap();
+ }
+ });
+
+ tokio::spawn(async move {
+ let mut buf = [0u8; 32];
+ loop {
+ let (len, addr) = r.recv_from(&mut buf).await.unwrap();
+ tx.send((buf[..len].to_vec(), addr)).await.unwrap();
+ }
+ });
+
+ // test that we can send a value and get back some response
+ let sender = UdpSocket::bind("127.0.0.1:0").await?;
+ sender.send_to(MSG, addr).await?;
+ let mut recv_buf = [0u8; 32];
+ let (len, _) = sender.recv_from(&mut recv_buf).await?;
+ assert_eq!(&recv_buf[..len], MSG);
+ Ok(())
+}
+
// # Note
//
// This test is purposely written such that each time `sender` sends data on