diff options
Diffstat (limited to 'tokio/tests/tcp_stream.rs')
-rw-r--r-- | tokio/tests/tcp_stream.rs | 112 |
1 files changed, 110 insertions, 2 deletions
diff --git a/tokio/tests/tcp_stream.rs b/tokio/tests/tcp_stream.rs index 784ade8a..84d58dc5 100644 --- a/tokio/tests/tcp_stream.rs +++ b/tokio/tests/tcp_stream.rs @@ -1,12 +1,16 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -use tokio::io::Interest; +use tokio::io::{AsyncReadExt, AsyncWriteExt, Interest}; use tokio::net::{TcpListener, TcpStream}; +use tokio::try_join; use tokio_test::task; -use tokio_test::{assert_pending, assert_ready_ok}; +use tokio_test::{assert_ok, assert_pending, assert_ready_ok}; use std::io; +use std::task::Poll; + +use futures::future::poll_fn; #[tokio::test] async fn try_read_write() { @@ -110,3 +114,107 @@ fn buffer_not_included_in_future() { let n = mem::size_of_val(&fut); assert!(n < 1000); } + +macro_rules! assert_readable_by_polling { + ($stream:expr) => { + assert_ok!(poll_fn(|cx| $stream.poll_read_ready(cx)).await); + }; +} + +macro_rules! assert_not_readable_by_polling { + ($stream:expr) => { + poll_fn(|cx| { + assert_pending!($stream.poll_read_ready(cx)); + Poll::Ready(()) + }) + .await; + }; +} + +macro_rules! assert_writable_by_polling { + ($stream:expr) => { + assert_ok!(poll_fn(|cx| $stream.poll_write_ready(cx)).await); + }; +} + +macro_rules! assert_not_writable_by_polling { + ($stream:expr) => { + poll_fn(|cx| { + assert_pending!($stream.poll_write_ready(cx)); + Poll::Ready(()) + }) + .await; + }; +} + +#[tokio::test] +async fn poll_read_ready() { + let (mut client, mut server) = create_pair().await; + + // Initial state - not readable. + assert_not_readable_by_polling!(server); + + // There is data in the buffer - readable. + assert_ok!(client.write_all(b"ping").await); + assert_readable_by_polling!(server); + + // Readable until calls to `poll_read` return `Poll::Pending`. + let mut buf = [0u8; 4]; + assert_ok!(server.read_exact(&mut buf).await); + assert_readable_by_polling!(server); + read_until_pending(&mut server); + assert_not_readable_by_polling!(server); + + // Detect the client disconnect. + drop(client); + assert_readable_by_polling!(server); +} + +#[tokio::test] +async fn poll_write_ready() { + let (mut client, server) = create_pair().await; + + // Initial state - writable. + assert_writable_by_polling!(client); + + // No space to write - not writable. + write_until_pending(&mut client); + assert_not_writable_by_polling!(client); + + // Detect the server disconnect. + drop(server); + assert_writable_by_polling!(client); +} + +async fn create_pair() -> (TcpStream, TcpStream) { + let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(listener.local_addr()); + let (client, (server, _)) = assert_ok!(try_join!(TcpStream::connect(&addr), listener.accept())); + (client, server) +} + +fn read_until_pending(stream: &mut TcpStream) { + let mut buf = vec![0u8; 1024 * 1024]; + loop { + match stream.try_read(&mut buf) { + Ok(_) => (), + Err(err) => { + assert_eq!(err.kind(), io::ErrorKind::WouldBlock); + break; + } + } + } +} + +fn write_until_pending(stream: &mut TcpStream) { + let buf = vec![0u8; 1024 * 1024]; + loop { + match stream.try_write(&buf) { + Ok(_) => (), + Err(err) => { + assert_eq!(err.kind(), io::ErrorKind::WouldBlock); + break; + } + } + } +} |