summaryrefslogtreecommitdiffstats
path: root/tokio/tests
diff options
context:
space:
mode:
authorSean McArthur <sean@seanmonstar.com>2020-09-23 13:02:15 -0700
committerGitHub <noreply@github.com>2020-09-23 13:02:15 -0700
commita0557840eb424e174bf81a0175c40f9e176a2cc2 (patch)
tree676d33bf4144f0c0aac5af9f826ecc216a1d50e2 /tokio/tests
parentf25f12d57638a2928b3f738b3b1392d8773e276e (diff)
io: use intrusive wait list for I/O driver (#2828)
This refactors I/O registration in a few ways: - Cleans up the cached readiness in `PollEvented`. This cache used to be helpful when readiness was a linked list of `*mut Node`s in `Registration`. Previous refactors have turned `Registration` into just an `AtomicUsize` holding the current readiness, so the cache is just extra work and complexity. Gone. - Polling the `Registration` for readiness now gives a `ReadyEvent`, which includes the driver tick. This event must be passed back into `clear_readiness`, so that the readiness is only cleared from `Registration` if the tick hasn't changed. Previously, it was possible to clear the readiness even though another thread had *just* polled the driver and found the socket ready again. - Registration now also contains an `async fn readiness`, which stores wakers in an instrusive linked list. This allows an unbounded number of tasks to register for readiness (previously, only 1 per direction (read and write)). By using the intrusive linked list, there is no concern of leaking the storage of the wakers, since they are stored inside the `async fn` and released when the future is dropped. - Registration retains a `poll_readiness(Direction)` method, to support `AsyncRead` and `AsyncWrite`. They aren't able to use `async fn`s, and so there are 2 reserved slots for those methods. - IO types where it makes sense to have multiple tasks waiting on them now take advantage of this new `async fn readiness`, such as `UdpSocket` and `UnixDatagram`. Additionally, this makes the `io-driver` "feature" internal-only (no longer documented, not part of public API), and adds a second internal-only feature, `io-readiness`, to group together linked list part of registration that is only used by some of the IO types. After a bit of discussion, changing stream-based transports (like `TcpStream`) to have `async fn read(&self)` is punted, since that is likely too easy of a footgun to activate. Refs: #2779, #2728
Diffstat (limited to 'tokio/tests')
-rw-r--r--tokio/tests/async_send_sync.rs4
-rw-r--r--tokio/tests/rt_common.rs4
-rw-r--r--tokio/tests/udp.rs34
-rw-r--r--tokio/tests/uds_datagram.rs34
4 files changed, 17 insertions, 59 deletions
diff --git a/tokio/tests/async_send_sync.rs b/tokio/tests/async_send_sync.rs
index e7011e3b..b3492b5e 100644
--- a/tokio/tests/async_send_sync.rs
+++ b/tokio/tests/async_send_sync.rs
@@ -172,10 +172,6 @@ async_assert_fn!(tokio::net::UdpSocket::send(_, &[u8]): Send & Sync);
async_assert_fn!(tokio::net::UdpSocket::recv(_, &mut [u8]): Send & Sync);
async_assert_fn!(tokio::net::UdpSocket::send_to(_, &[u8], SocketAddr): Send & Sync);
async_assert_fn!(tokio::net::UdpSocket::recv_from(_, &mut [u8]): Send & Sync);
-async_assert_fn!(tokio::net::udp::RecvHalf::recv(_, &mut [u8]): Send & Sync);
-async_assert_fn!(tokio::net::udp::RecvHalf::recv_from(_, &mut [u8]): Send & Sync);
-async_assert_fn!(tokio::net::udp::SendHalf::send(_, &[u8]): Send & Sync);
-async_assert_fn!(tokio::net::udp::SendHalf::send_to(_, &[u8], &SocketAddr): Send & Sync);
#[cfg(unix)]
mod unix_datagram {
diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs
index a8968be1..7f0491c4 100644
--- a/tokio/tests/rt_common.rs
+++ b/tokio/tests/rt_common.rs
@@ -827,6 +827,7 @@ rt_test! {
#[test]
fn io_notify_while_shutting_down() {
use std::net::Ipv6Addr;
+ use std::sync::Arc;
for _ in 1..10 {
let runtime = rt();
@@ -834,7 +835,8 @@ rt_test! {
runtime.block_on(async {
let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
let addr = socket.local_addr().unwrap();
- let (mut recv_half, mut send_half) = socket.split();
+ let send_half = Arc::new(socket);
+ let recv_half = send_half.clone();
tokio::spawn(async move {
let mut buf = [0];
diff --git a/tokio/tests/udp.rs b/tokio/tests/udp.rs
index 62a2234f..9da67267 100644
--- a/tokio/tests/udp.rs
+++ b/tokio/tests/udp.rs
@@ -1,6 +1,7 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
+use std::sync::Arc;
use tokio::net::UdpSocket;
const MSG: &[u8] = b"hello";
@@ -8,8 +9,8 @@ const MSG_LEN: usize = MSG.len();
#[tokio::test]
async fn send_recv() -> std::io::Result<()> {
- let mut sender = UdpSocket::bind("127.0.0.1:0").await?;
- let mut receiver = UdpSocket::bind("127.0.0.1:0").await?;
+ let sender = UdpSocket::bind("127.0.0.1:0").await?;
+ let receiver = UdpSocket::bind("127.0.0.1:0").await?;
sender.connect(receiver.local_addr()?).await?;
receiver.connect(sender.local_addr()?).await?;
@@ -25,8 +26,8 @@ async fn send_recv() -> std::io::Result<()> {
#[tokio::test]
async fn send_to_recv_from() -> std::io::Result<()> {
- let mut sender = UdpSocket::bind("127.0.0.1:0").await?;
- let mut receiver = UdpSocket::bind("127.0.0.1:0").await?;
+ let sender = UdpSocket::bind("127.0.0.1:0").await?;
+ let receiver = UdpSocket::bind("127.0.0.1:0").await?;
let receiver_addr = receiver.local_addr()?;
sender.send_to(MSG, &receiver_addr).await?;
@@ -42,9 +43,10 @@ async fn send_to_recv_from() -> std::io::Result<()> {
#[tokio::test]
async fn split() -> std::io::Result<()> {
let socket = UdpSocket::bind("127.0.0.1:0").await?;
- let (mut r, mut s) = socket.split();
+ let s = Arc::new(socket);
+ let r = s.clone();
- let addr = s.as_ref().local_addr()?;
+ let addr = s.local_addr()?;
tokio::spawn(async move {
s.send_to(MSG, &addr).await.unwrap();
});
@@ -54,24 +56,6 @@ async fn split() -> std::io::Result<()> {
Ok(())
}
-#[tokio::test]
-async fn reunite() -> std::io::Result<()> {
- let socket = UdpSocket::bind("127.0.0.1:0").await?;
- let (s, r) = socket.split();
- assert!(s.reunite(r).is_ok());
- Ok(())
-}
-
-#[tokio::test]
-async fn reunite_error() -> std::io::Result<()> {
- let socket = UdpSocket::bind("127.0.0.1:0").await?;
- let socket1 = UdpSocket::bind("127.0.0.1:0").await?;
- let (s, _) = socket.split();
- let (_, r1) = socket1.split();
- assert!(s.reunite(r1).is_err());
- Ok(())
-}
-
// # Note
//
// This test is purposely written such that each time `sender` sends data on
@@ -86,7 +70,7 @@ async fn try_send_spawn() {
const MSG2_LEN: usize = MSG2.len();
let sender = UdpSocket::bind("127.0.0.1:0").await.unwrap();
- let mut receiver = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+ let receiver = UdpSocket::bind("127.0.0.1:0").await.unwrap();
receiver
.connect(sender.local_addr().unwrap())
diff --git a/tokio/tests/uds_datagram.rs b/tokio/tests/uds_datagram.rs
index d3c3535e..bd47f19e 100644
--- a/tokio/tests/uds_datagram.rs
+++ b/tokio/tests/uds_datagram.rs
@@ -6,8 +6,9 @@ use tokio::net::UnixDatagram;
use tokio::try_join;
use std::io;
+use std::sync::Arc;
-async fn echo_server(mut socket: UnixDatagram) -> io::Result<()> {
+async fn echo_server(socket: UnixDatagram) -> io::Result<()> {
let mut recv_buf = vec![0u8; 1024];
loop {
let (len, peer_addr) = socket.recv_from(&mut recv_buf[..]).await?;
@@ -32,7 +33,7 @@ async fn echo() -> io::Result<()> {
});
{
- let mut socket = UnixDatagram::bind(&client_path).unwrap();
+ let socket = UnixDatagram::bind(&client_path).unwrap();
socket.connect(server_path)?;
socket.send(b"ECHO").await?;
let mut recv_buf = [0u8; 16];
@@ -87,8 +88,8 @@ async fn try_send_recv_never_block() -> io::Result<()> {
async fn split() -> std::io::Result<()> {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("split.sock");
- let socket = UnixDatagram::bind(path.clone())?;
- let (mut r, mut s) = socket.into_split();
+ let s = Arc::new(UnixDatagram::bind(path.clone())?);
+ let r = s.clone();
let msg = b"hello";
let ((), ()) = try_join! {
@@ -106,28 +107,3 @@ async fn split() -> std::io::Result<()> {
Ok(())
}
-
-#[tokio::test]
-async fn reunite() -> std::io::Result<()> {
- let dir = tempfile::tempdir().unwrap();
- let path = dir.path().join("reunite.sock");
- let socket = UnixDatagram::bind(path)?;
- let (s, r) = socket.into_split();
- assert!(s.reunite(r).is_ok());
- Ok(())
-}
-
-#[tokio::test]
-async fn reunite_error() -> std::io::Result<()> {
- let dir = tempfile::tempdir().unwrap();
- let path = dir.path().join("reunit.sock");
- let dir = tempfile::tempdir().unwrap();
- let path1 = dir.path().join("reunit.sock");
- let socket = UnixDatagram::bind(path)?;
- let socket1 = UnixDatagram::bind(path1)?;
-
- let (s, _) = socket.into_split();
- let (_, r1) = socket1.into_split();
- assert!(s.reunite(r1).is_err());
- Ok(())
-}