diff options
author | Sean McArthur <sean@seanmonstar.com> | 2020-09-23 13:02:15 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-23 13:02:15 -0700 |
commit | a0557840eb424e174bf81a0175c40f9e176a2cc2 (patch) | |
tree | 676d33bf4144f0c0aac5af9f826ecc216a1d50e2 /examples | |
parent | f25f12d57638a2928b3f738b3b1392d8773e276e (diff) |
io: use intrusive wait list for I/O driver (#2828)
This refactors I/O registration in a few ways:
- Cleans up the cached readiness in `PollEvented`. This cache used to
be helpful when readiness was a linked list of `*mut Node`s in
`Registration`. Previous refactors have turned `Registration` into just
an `AtomicUsize` holding the current readiness, so the cache is just
extra work and complexity. Gone.
- Polling the `Registration` for readiness now gives a `ReadyEvent`,
which includes the driver tick. This event must be passed back into
`clear_readiness`, so that the readiness is only cleared from `Registration`
if the tick hasn't changed. Previously, it was possible to clear the
readiness even though another thread had *just* polled the driver and
found the socket ready again.
- Registration now also contains an `async fn readiness`, which stores
wakers in an instrusive linked list. This allows an unbounded number
of tasks to register for readiness (previously, only 1 per direction (read
and write)). By using the intrusive linked list, there is no concern of
leaking the storage of the wakers, since they are stored inside the `async fn`
and released when the future is dropped.
- Registration retains a `poll_readiness(Direction)` method, to support
`AsyncRead` and `AsyncWrite`. They aren't able to use `async fn`s, and
so there are 2 reserved slots for those methods.
- IO types where it makes sense to have multiple tasks waiting on them
now take advantage of this new `async fn readiness`, such as `UdpSocket`
and `UnixDatagram`.
Additionally, this makes the `io-driver` "feature" internal-only (no longer
documented, not part of public API), and adds a second internal-only
feature, `io-readiness`, to group together linked list part of registration
that is only used by some of the IO types.
After a bit of discussion, changing stream-based transports (like
`TcpStream`) to have `async fn read(&self)` is punted, since that
is likely too easy of a footgun to activate.
Refs: #2779, #2728
Diffstat (limited to 'examples')
-rw-r--r-- | examples/connect.rs | 8 | ||||
-rw-r--r-- | examples/echo-udp.rs | 2 | ||||
-rw-r--r-- | examples/udp-client.rs | 2 | ||||
-rw-r--r-- | examples/udp-codec.rs | 6 |
4 files changed, 11 insertions, 7 deletions
diff --git a/examples/connect.rs b/examples/connect.rs index 5d0515a7..6e909b25 100644 --- a/examples/connect.rs +++ b/examples/connect.rs @@ -96,7 +96,6 @@ mod udp { use std::error::Error; use std::io; use std::net::SocketAddr; - use tokio::net::udp::{RecvHalf, SendHalf}; use tokio::net::UdpSocket; pub async fn connect( @@ -114,16 +113,15 @@ mod udp { let socket = UdpSocket::bind(&bind_addr).await?; socket.connect(addr).await?; - let (mut r, mut w) = socket.split(); - future::try_join(send(stdin, &mut w), recv(stdout, &mut r)).await?; + future::try_join(send(stdin, &socket), recv(stdout, &socket)).await?; Ok(()) } async fn send( mut stdin: impl Stream<Item = Result<Bytes, io::Error>> + Unpin, - writer: &mut SendHalf, + writer: &UdpSocket, ) -> Result<(), io::Error> { while let Some(item) = stdin.next().await { let buf = item?; @@ -135,7 +133,7 @@ mod udp { async fn recv( mut stdout: impl Sink<Bytes, Error = io::Error> + Unpin, - reader: &mut RecvHalf, + reader: &UdpSocket, ) -> Result<(), io::Error> { loop { let mut buf = vec![0; 1024]; diff --git a/examples/echo-udp.rs b/examples/echo-udp.rs index bc688b9b..3027c869 100644 --- a/examples/echo-udp.rs +++ b/examples/echo-udp.rs @@ -26,7 +26,7 @@ struct Server { impl Server { async fn run(self) -> Result<(), io::Error> { let Server { - mut socket, + socket, mut buf, mut to_send, } = self; diff --git a/examples/udp-client.rs b/examples/udp-client.rs index a191033d..a394ee66 100644 --- a/examples/udp-client.rs +++ b/examples/udp-client.rs @@ -55,7 +55,7 @@ async fn main() -> Result<(), Box<dyn Error>> { } .parse()?; - let mut socket = UdpSocket::bind(local_addr).await?; + let socket = UdpSocket::bind(local_addr).await?; const MAX_DATAGRAM_SIZE: usize = 65_507; socket.connect(&remote_addr).await?; let data = get_stdin_data()?; diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs index 8b64cbc3..f338bd84 100644 --- a/examples/udp-codec.rs +++ b/examples/udp-codec.rs @@ -1,3 +1,8 @@ +fn main() {} + +// Disabled while future of UdpFramed is decided on. +// See https://github.com/tokio-rs/tokio/issues/2830 +/* //! This example leverages `BytesCodec` to create a UDP client and server which //! speak a custom protocol. //! @@ -78,3 +83,4 @@ async fn pong(socket: &mut UdpFramed<BytesCodec>) -> Result<(), io::Error> { Ok(()) } +*/ |