diff options
author | Sean McArthur <sean@seanmonstar.com> | 2020-09-23 13:02:15 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-23 13:02:15 -0700 |
commit | a0557840eb424e174bf81a0175c40f9e176a2cc2 (patch) | |
tree | 676d33bf4144f0c0aac5af9f826ecc216a1d50e2 /tokio/tests | |
parent | f25f12d57638a2928b3f738b3b1392d8773e276e (diff) |
io: use intrusive wait list for I/O driver (#2828)
This refactors I/O registration in a few ways:
- Cleans up the cached readiness in `PollEvented`. This cache used to
be helpful when readiness was a linked list of `*mut Node`s in
`Registration`. Previous refactors have turned `Registration` into just
an `AtomicUsize` holding the current readiness, so the cache is just
extra work and complexity. Gone.
- Polling the `Registration` for readiness now gives a `ReadyEvent`,
which includes the driver tick. This event must be passed back into
`clear_readiness`, so that the readiness is only cleared from `Registration`
if the tick hasn't changed. Previously, it was possible to clear the
readiness even though another thread had *just* polled the driver and
found the socket ready again.
- Registration now also contains an `async fn readiness`, which stores
wakers in an instrusive linked list. This allows an unbounded number
of tasks to register for readiness (previously, only 1 per direction (read
and write)). By using the intrusive linked list, there is no concern of
leaking the storage of the wakers, since they are stored inside the `async fn`
and released when the future is dropped.
- Registration retains a `poll_readiness(Direction)` method, to support
`AsyncRead` and `AsyncWrite`. They aren't able to use `async fn`s, and
so there are 2 reserved slots for those methods.
- IO types where it makes sense to have multiple tasks waiting on them
now take advantage of this new `async fn readiness`, such as `UdpSocket`
and `UnixDatagram`.
Additionally, this makes the `io-driver` "feature" internal-only (no longer
documented, not part of public API), and adds a second internal-only
feature, `io-readiness`, to group together linked list part of registration
that is only used by some of the IO types.
After a bit of discussion, changing stream-based transports (like
`TcpStream`) to have `async fn read(&self)` is punted, since that
is likely too easy of a footgun to activate.
Refs: #2779, #2728
Diffstat (limited to 'tokio/tests')
-rw-r--r-- | tokio/tests/async_send_sync.rs | 4 | ||||
-rw-r--r-- | tokio/tests/rt_common.rs | 4 | ||||
-rw-r--r-- | tokio/tests/udp.rs | 34 | ||||
-rw-r--r-- | tokio/tests/uds_datagram.rs | 34 |
4 files changed, 17 insertions, 59 deletions
diff --git a/tokio/tests/async_send_sync.rs b/tokio/tests/async_send_sync.rs index e7011e3b..b3492b5e 100644 --- a/tokio/tests/async_send_sync.rs +++ b/tokio/tests/async_send_sync.rs @@ -172,10 +172,6 @@ async_assert_fn!(tokio::net::UdpSocket::send(_, &[u8]): Send & Sync); async_assert_fn!(tokio::net::UdpSocket::recv(_, &mut [u8]): Send & Sync); async_assert_fn!(tokio::net::UdpSocket::send_to(_, &[u8], SocketAddr): Send & Sync); async_assert_fn!(tokio::net::UdpSocket::recv_from(_, &mut [u8]): Send & Sync); -async_assert_fn!(tokio::net::udp::RecvHalf::recv(_, &mut [u8]): Send & Sync); -async_assert_fn!(tokio::net::udp::RecvHalf::recv_from(_, &mut [u8]): Send & Sync); -async_assert_fn!(tokio::net::udp::SendHalf::send(_, &[u8]): Send & Sync); -async_assert_fn!(tokio::net::udp::SendHalf::send_to(_, &[u8], &SocketAddr): Send & Sync); #[cfg(unix)] mod unix_datagram { diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index a8968be1..7f0491c4 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -827,6 +827,7 @@ rt_test! { #[test] fn io_notify_while_shutting_down() { use std::net::Ipv6Addr; + use std::sync::Arc; for _ in 1..10 { let runtime = rt(); @@ -834,7 +835,8 @@ rt_test! { runtime.block_on(async { let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap(); let addr = socket.local_addr().unwrap(); - let (mut recv_half, mut send_half) = socket.split(); + let send_half = Arc::new(socket); + let recv_half = send_half.clone(); tokio::spawn(async move { let mut buf = [0]; diff --git a/tokio/tests/udp.rs b/tokio/tests/udp.rs index 62a2234f..9da67267 100644 --- a/tokio/tests/udp.rs +++ b/tokio/tests/udp.rs @@ -1,6 +1,7 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] +use std::sync::Arc; use tokio::net::UdpSocket; const MSG: &[u8] = b"hello"; @@ -8,8 +9,8 @@ const MSG_LEN: usize = MSG.len(); #[tokio::test] async fn send_recv() -> std::io::Result<()> { - let mut sender = UdpSocket::bind("127.0.0.1:0").await?; - let mut receiver = UdpSocket::bind("127.0.0.1:0").await?; + let sender = UdpSocket::bind("127.0.0.1:0").await?; + let receiver = UdpSocket::bind("127.0.0.1:0").await?; sender.connect(receiver.local_addr()?).await?; receiver.connect(sender.local_addr()?).await?; @@ -25,8 +26,8 @@ async fn send_recv() -> std::io::Result<()> { #[tokio::test] async fn send_to_recv_from() -> std::io::Result<()> { - let mut sender = UdpSocket::bind("127.0.0.1:0").await?; - let mut receiver = UdpSocket::bind("127.0.0.1:0").await?; + let sender = UdpSocket::bind("127.0.0.1:0").await?; + let receiver = UdpSocket::bind("127.0.0.1:0").await?; let receiver_addr = receiver.local_addr()?; sender.send_to(MSG, &receiver_addr).await?; @@ -42,9 +43,10 @@ async fn send_to_recv_from() -> std::io::Result<()> { #[tokio::test] async fn split() -> std::io::Result<()> { let socket = UdpSocket::bind("127.0.0.1:0").await?; - let (mut r, mut s) = socket.split(); + let s = Arc::new(socket); + let r = s.clone(); - let addr = s.as_ref().local_addr()?; + let addr = s.local_addr()?; tokio::spawn(async move { s.send_to(MSG, &addr).await.unwrap(); }); @@ -54,24 +56,6 @@ async fn split() -> std::io::Result<()> { Ok(()) } -#[tokio::test] -async fn reunite() -> std::io::Result<()> { - let socket = UdpSocket::bind("127.0.0.1:0").await?; - let (s, r) = socket.split(); - assert!(s.reunite(r).is_ok()); - Ok(()) -} - -#[tokio::test] -async fn reunite_error() -> std::io::Result<()> { - let socket = UdpSocket::bind("127.0.0.1:0").await?; - let socket1 = UdpSocket::bind("127.0.0.1:0").await?; - let (s, _) = socket.split(); - let (_, r1) = socket1.split(); - assert!(s.reunite(r1).is_err()); - Ok(()) -} - // # Note // // This test is purposely written such that each time `sender` sends data on @@ -86,7 +70,7 @@ async fn try_send_spawn() { const MSG2_LEN: usize = MSG2.len(); let sender = UdpSocket::bind("127.0.0.1:0").await.unwrap(); - let mut receiver = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + let receiver = UdpSocket::bind("127.0.0.1:0").await.unwrap(); receiver .connect(sender.local_addr().unwrap()) diff --git a/tokio/tests/uds_datagram.rs b/tokio/tests/uds_datagram.rs index d3c3535e..bd47f19e 100644 --- a/tokio/tests/uds_datagram.rs +++ b/tokio/tests/uds_datagram.rs @@ -6,8 +6,9 @@ use tokio::net::UnixDatagram; use tokio::try_join; use std::io; +use std::sync::Arc; -async fn echo_server(mut socket: UnixDatagram) -> io::Result<()> { +async fn echo_server(socket: UnixDatagram) -> io::Result<()> { let mut recv_buf = vec![0u8; 1024]; loop { let (len, peer_addr) = socket.recv_from(&mut recv_buf[..]).await?; @@ -32,7 +33,7 @@ async fn echo() -> io::Result<()> { }); { - let mut socket = UnixDatagram::bind(&client_path).unwrap(); + let socket = UnixDatagram::bind(&client_path).unwrap(); socket.connect(server_path)?; socket.send(b"ECHO").await?; let mut recv_buf = [0u8; 16]; @@ -87,8 +88,8 @@ async fn try_send_recv_never_block() -> io::Result<()> { async fn split() -> std::io::Result<()> { let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("split.sock"); - let socket = UnixDatagram::bind(path.clone())?; - let (mut r, mut s) = socket.into_split(); + let s = Arc::new(UnixDatagram::bind(path.clone())?); + let r = s.clone(); let msg = b"hello"; let ((), ()) = try_join! { @@ -106,28 +107,3 @@ async fn split() -> std::io::Result<()> { Ok(()) } - -#[tokio::test] -async fn reunite() -> std::io::Result<()> { - let dir = tempfile::tempdir().unwrap(); - let path = dir.path().join("reunite.sock"); - let socket = UnixDatagram::bind(path)?; - let (s, r) = socket.into_split(); - assert!(s.reunite(r).is_ok()); - Ok(()) -} - -#[tokio::test] -async fn reunite_error() -> std::io::Result<()> { - let dir = tempfile::tempdir().unwrap(); - let path = dir.path().join("reunit.sock"); - let dir = tempfile::tempdir().unwrap(); - let path1 = dir.path().join("reunit.sock"); - let socket = UnixDatagram::bind(path)?; - let socket1 = UnixDatagram::bind(path1)?; - - let (s, _) = socket.into_split(); - let (_, r1) = socket1.into_split(); - assert!(s.reunite(r1).is_err()); - Ok(()) -} |