diff options
author | Carl Lerche <me@carllerche.com> | 2020-11-12 20:07:43 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-12 20:07:43 -0800 |
commit | 02b1117dca1c1e1fcc700bff4d6a93c33bfbc7d8 (patch) | |
tree | 95788ea2d89de4af6021befa70a3f5d80034578a /tokio/tests | |
parent | 685da8dadd8821d1053ce7ecaf01ab5ee231bef9 (diff) |
net: add TcpStream::ready and non-blocking ops (#3130)
Adds function to await for readiness on the TcpStream and non-blocking read/write functions.
`async fn TcpStream::ready(Interest)` waits for socket readiness satisfying **any** of the specified
interest. There are also two shorthand functions, `readable()` and `writable()`.
Once the stream is in a ready state, the caller may perform non-blocking operations on it using
`try_read()` and `try_write()`. These function return `WouldBlock` if the stream is not, in fact, ready.
The await readiness function are similar to `AsyncFd`, but do not require a guard. The guard in
`AsyncFd` protect against a potential race between receiving the readiness notification and clearing
it. The guard is needed as Tokio does not control the operations. With `TcpStream`, the `try_read()`
and `try_write()` function handle clearing stream readiness as needed.
This also exposes `Interest` and `Ready`, both defined in Tokio as wrappers for Mio types. These
types will also be useful for fixing #3072 .
Other I/O types, such as `TcpListener`, `UdpSocket`, `Unix*` should get similar functions, but this
is left for later PRs.
Refs: #3130
Diffstat (limited to 'tokio/tests')
-rw-r--r-- | tokio/tests/tcp_stream.rs | 112 |
1 files changed, 112 insertions, 0 deletions
diff --git a/tokio/tests/tcp_stream.rs b/tokio/tests/tcp_stream.rs new file mode 100644 index 00000000..784ade8a --- /dev/null +++ b/tokio/tests/tcp_stream.rs @@ -0,0 +1,112 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::Interest; +use tokio::net::{TcpListener, TcpStream}; +use tokio_test::task; +use tokio_test::{assert_pending, assert_ready_ok}; + +use std::io; + +#[tokio::test] +async fn try_read_write() { + const DATA: &[u8] = b"this is some data to write to the socket"; + + // Create listener + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + + // Create socket pair + let client = TcpStream::connect(listener.local_addr().unwrap()) + .await + .unwrap(); + let (server, _) = listener.accept().await.unwrap(); + let mut written = DATA.to_vec(); + + // Track the server receiving data + let mut readable = task::spawn(server.readable()); + assert_pending!(readable.poll()); + + // Write data. + client.writable().await.unwrap(); + assert_eq!(DATA.len(), client.try_write(DATA).unwrap()); + + // The task should be notified + while !readable.is_woken() { + tokio::task::yield_now().await; + } + + // Fill the write buffer + loop { + // Still ready + let mut writable = task::spawn(client.writable()); + assert_ready_ok!(writable.poll()); + + match client.try_write(DATA) { + Ok(n) => written.extend(&DATA[..n]), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + break; + } + Err(e) => panic!("error = {:?}", e), + } + } + + { + // Write buffer full + let mut writable = task::spawn(client.writable()); + assert_pending!(writable.poll()); + + // Drain the socket from the server end + let mut read = vec![0; written.len()]; + let mut i = 0; + + while i < read.len() { + server.readable().await.unwrap(); + + match server.try_read(&mut read[i..]) { + Ok(n) => i += n, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("error = {:?}", e), + } + } + + assert_eq!(read, written); + } + + // Now, we listen for shutdown + drop(client); + + loop { + let ready = server.ready(Interest::READABLE).await.unwrap(); + + if ready.is_read_closed() { + return; + } else { + tokio::task::yield_now().await; + } + } +} + +#[test] +fn buffer_not_included_in_future() { + use std::mem; + + const N: usize = 4096; + + let fut = async { + let stream = TcpStream::connect("127.0.0.1:8080").await.unwrap(); + + loop { + stream.readable().await.unwrap(); + + let mut buf = [0; N]; + let n = stream.try_read(&mut buf[..]).unwrap(); + + if n == 0 { + break; + } + } + }; + + let n = mem::size_of_val(&fut); + assert!(n < 1000); +} |