summaryrefslogtreecommitdiffstats
path: root/examples
diff options
context:
space:
mode:
authorSean McArthur <sean@seanmonstar.com>2020-09-23 13:02:15 -0700
committerGitHub <noreply@github.com>2020-09-23 13:02:15 -0700
commita0557840eb424e174bf81a0175c40f9e176a2cc2 (patch)
tree676d33bf4144f0c0aac5af9f826ecc216a1d50e2 /examples
parentf25f12d57638a2928b3f738b3b1392d8773e276e (diff)
io: use intrusive wait list for I/O driver (#2828)
This refactors I/O registration in a few ways: - Cleans up the cached readiness in `PollEvented`. This cache used to be helpful when readiness was a linked list of `*mut Node`s in `Registration`. Previous refactors have turned `Registration` into just an `AtomicUsize` holding the current readiness, so the cache is just extra work and complexity. Gone. - Polling the `Registration` for readiness now gives a `ReadyEvent`, which includes the driver tick. This event must be passed back into `clear_readiness`, so that the readiness is only cleared from `Registration` if the tick hasn't changed. Previously, it was possible to clear the readiness even though another thread had *just* polled the driver and found the socket ready again. - Registration now also contains an `async fn readiness`, which stores wakers in an instrusive linked list. This allows an unbounded number of tasks to register for readiness (previously, only 1 per direction (read and write)). By using the intrusive linked list, there is no concern of leaking the storage of the wakers, since they are stored inside the `async fn` and released when the future is dropped. - Registration retains a `poll_readiness(Direction)` method, to support `AsyncRead` and `AsyncWrite`. They aren't able to use `async fn`s, and so there are 2 reserved slots for those methods. - IO types where it makes sense to have multiple tasks waiting on them now take advantage of this new `async fn readiness`, such as `UdpSocket` and `UnixDatagram`. Additionally, this makes the `io-driver` "feature" internal-only (no longer documented, not part of public API), and adds a second internal-only feature, `io-readiness`, to group together linked list part of registration that is only used by some of the IO types. After a bit of discussion, changing stream-based transports (like `TcpStream`) to have `async fn read(&self)` is punted, since that is likely too easy of a footgun to activate. Refs: #2779, #2728
Diffstat (limited to 'examples')
-rw-r--r--examples/connect.rs8
-rw-r--r--examples/echo-udp.rs2
-rw-r--r--examples/udp-client.rs2
-rw-r--r--examples/udp-codec.rs6
4 files changed, 11 insertions, 7 deletions
diff --git a/examples/connect.rs b/examples/connect.rs
index 5d0515a7..6e909b25 100644
--- a/examples/connect.rs
+++ b/examples/connect.rs
@@ -96,7 +96,6 @@ mod udp {
use std::error::Error;
use std::io;
use std::net::SocketAddr;
- use tokio::net::udp::{RecvHalf, SendHalf};
use tokio::net::UdpSocket;
pub async fn connect(
@@ -114,16 +113,15 @@ mod udp {
let socket = UdpSocket::bind(&bind_addr).await?;
socket.connect(addr).await?;
- let (mut r, mut w) = socket.split();
- future::try_join(send(stdin, &mut w), recv(stdout, &mut r)).await?;
+ future::try_join(send(stdin, &socket), recv(stdout, &socket)).await?;
Ok(())
}
async fn send(
mut stdin: impl Stream<Item = Result<Bytes, io::Error>> + Unpin,
- writer: &mut SendHalf,
+ writer: &UdpSocket,
) -> Result<(), io::Error> {
while let Some(item) = stdin.next().await {
let buf = item?;
@@ -135,7 +133,7 @@ mod udp {
async fn recv(
mut stdout: impl Sink<Bytes, Error = io::Error> + Unpin,
- reader: &mut RecvHalf,
+ reader: &UdpSocket,
) -> Result<(), io::Error> {
loop {
let mut buf = vec![0; 1024];
diff --git a/examples/echo-udp.rs b/examples/echo-udp.rs
index bc688b9b..3027c869 100644
--- a/examples/echo-udp.rs
+++ b/examples/echo-udp.rs
@@ -26,7 +26,7 @@ struct Server {
impl Server {
async fn run(self) -> Result<(), io::Error> {
let Server {
- mut socket,
+ socket,
mut buf,
mut to_send,
} = self;
diff --git a/examples/udp-client.rs b/examples/udp-client.rs
index a191033d..a394ee66 100644
--- a/examples/udp-client.rs
+++ b/examples/udp-client.rs
@@ -55,7 +55,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
.parse()?;
- let mut socket = UdpSocket::bind(local_addr).await?;
+ let socket = UdpSocket::bind(local_addr).await?;
const MAX_DATAGRAM_SIZE: usize = 65_507;
socket.connect(&remote_addr).await?;
let data = get_stdin_data()?;
diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs
index 8b64cbc3..f338bd84 100644
--- a/examples/udp-codec.rs
+++ b/examples/udp-codec.rs
@@ -1,3 +1,8 @@
+fn main() {}
+
+// Disabled while future of UdpFramed is decided on.
+// See https://github.com/tokio-rs/tokio/issues/2830
+/*
//! This example leverages `BytesCodec` to create a UDP client and server which
//! speak a custom protocol.
//!
@@ -78,3 +83,4 @@ async fn pong(socket: &mut UdpFramed<BytesCodec>) -> Result<(), io::Error> {
Ok(())
}
+*/