From 02b1117dca1c1e1fcc700bff4d6a93c33bfbc7d8 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 12 Nov 2020 20:07:43 -0800 Subject: net: add TcpStream::ready and non-blocking ops (#3130) Adds function to await for readiness on the TcpStream and non-blocking read/write functions. `async fn TcpStream::ready(Interest)` waits for socket readiness satisfying **any** of the specified interest. There are also two shorthand functions, `readable()` and `writable()`. Once the stream is in a ready state, the caller may perform non-blocking operations on it using `try_read()` and `try_write()`. These function return `WouldBlock` if the stream is not, in fact, ready. The await readiness function are similar to `AsyncFd`, but do not require a guard. The guard in `AsyncFd` protect against a potential race between receiving the readiness notification and clearing it. The guard is needed as Tokio does not control the operations. With `TcpStream`, the `try_read()` and `try_write()` function handle clearing stream readiness as needed. This also exposes `Interest` and `Ready`, both defined in Tokio as wrappers for Mio types. These types will also be useful for fixing #3072 . Other I/O types, such as `TcpListener`, `UdpSocket`, `Unix*` should get similar functions, but this is left for later PRs. Refs: #3130 --- tokio/src/io/driver/interest.rs | 70 ++++++++-- tokio/src/io/driver/mod.rs | 8 +- tokio/src/io/driver/ready.rs | 120 ++++++++++++----- tokio/src/io/driver/registration.rs | 21 +++ tokio/src/io/driver/scheduled_io.rs | 13 +- tokio/src/io/mod.rs | 7 +- tokio/src/macros/cfg.rs | 13 ++ tokio/src/net/tcp/stream.rs | 262 +++++++++++++++++++++++++++++++++++- tokio/src/signal/unix/driver.rs | 4 +- tokio/tests/tcp_stream.rs | 112 +++++++++++++++ 10 files changed, 576 insertions(+), 54 deletions(-) create mode 100644 tokio/tests/tcp_stream.rs (limited to 'tokio') diff --git a/tokio/src/io/driver/interest.rs b/tokio/src/io/driver/interest.rs index f9887e86..8c8049df 100644 --- a/tokio/src/io/driver/interest.rs +++ b/tokio/src/io/driver/interest.rs @@ -1,3 +1,7 @@ +#![cfg_attr(not(feature = "net"), allow(unreachable_pub))] + +use crate::io::driver::Ready; + use std::fmt; use std::ops; @@ -5,34 +9,84 @@ use std::ops; /// /// Specifies the readiness events the caller is interested in when awaiting on /// I/O resource readiness states. -#[derive(Clone, Copy)] -pub(crate) struct Interest(mio::Interest); +#[cfg_attr(docsrs, doc(cfg(feature = "net")))] +#[derive(Clone, Copy, Eq, PartialEq)] +pub struct Interest(mio::Interest); impl Interest { - /// Interest in all readable events - pub(crate) const READABLE: Interest = Interest(mio::Interest::READABLE); + /// Interest in all readable events. + /// + /// Readable interest includes read-closed events. + pub const READABLE: Interest = Interest(mio::Interest::READABLE); /// Interest in all writable events - pub(crate) const WRITABLE: Interest = Interest(mio::Interest::WRITABLE); + /// + /// Writable interest includes write-closed events. + pub const WRITABLE: Interest = Interest(mio::Interest::WRITABLE); /// Returns true if the value includes readable interest. - pub(crate) const fn is_readable(self) -> bool { + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Interest; + /// + /// assert!(Interest::READABLE.is_readable()); + /// assert!(!Interest::WRITABLE.is_readable()); + /// + /// let both = Interest::READABLE | Interest::WRITABLE; + /// assert!(both.is_readable()); + /// ``` + pub const fn is_readable(self) -> bool { self.0.is_readable() } /// Returns true if the value includes writable interest. - pub(crate) const fn is_writable(self) -> bool { + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Interest; + /// + /// assert!(!Interest::READABLE.is_writable()); + /// assert!(Interest::WRITABLE.is_writable()); + /// + /// let both = Interest::READABLE | Interest::WRITABLE; + /// assert!(both.is_writable()); + /// ``` + pub const fn is_writable(self) -> bool { self.0.is_writable() } /// Add together two `Interst` values. - pub(crate) const fn add(self, other: Interest) -> Interest { + /// + /// This function works from a `const` context. + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Interest; + /// + /// const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); + /// + /// assert!(BOTH.is_readable()); + /// assert!(BOTH.is_writable()); + pub const fn add(self, other: Interest) -> Interest { Interest(self.0.add(other.0)) } + // This function must be crate-private to avoid exposing a `mio` dependency. pub(crate) const fn to_mio(self) -> mio::Interest { self.0 } + + pub(super) fn mask(self) -> Ready { + match self { + Interest::READABLE => Ready::READABLE | Ready::READ_CLOSED, + Interest::WRITABLE => Ready::WRITABLE | Ready::WRITE_CLOSED, + _ => Ready::EMPTY, + } + } } impl ops::BitOr for Interest { diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index c494db41..a1784dff 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -1,10 +1,12 @@ #![cfg_attr(not(feature = "rt"), allow(dead_code))] mod interest; -pub(crate) use interest::Interest; +#[allow(unreachable_pub)] +pub use interest::Interest; mod ready; -use ready::Ready; +#[allow(unreachable_pub)] +pub use ready::Ready; mod registration; pub(crate) use registration::Registration; @@ -51,7 +53,7 @@ pub(crate) struct Handle { pub(crate) struct ReadyEvent { tick: u8, - ready: Ready, + pub(crate) ready: Ready, } pub(super) struct Inner { diff --git a/tokio/src/io/driver/ready.rs b/tokio/src/io/driver/ready.rs index 2790cc13..2ac01bdb 100644 --- a/tokio/src/io/driver/ready.rs +++ b/tokio/src/io/driver/ready.rs @@ -1,3 +1,5 @@ +#![cfg_attr(not(feature = "net"), allow(unreachable_pub))] + use std::fmt; use std::ops; @@ -6,36 +8,33 @@ const WRITABLE: usize = 0b0_10; const READ_CLOSED: usize = 0b0_0100; const WRITE_CLOSED: usize = 0b0_1000; -/// A set of readiness event kinds. -/// -/// `Ready` is set of operation descriptors indicating which kind of an -/// operation is ready to be performed. +/// Describes the readiness state of an I/O resources. /// -/// This struct only represents portable event kinds. Portable events are -/// events that can be raised on any platform while guaranteeing no false -/// positives. +/// `Ready` tracks which operation an I/O resource is ready to perform. +#[cfg_attr(docsrs, doc(cfg(feature = "net")))] #[derive(Clone, Copy, PartialEq, PartialOrd)] -pub(crate) struct Ready(usize); +pub struct Ready(usize); impl Ready { /// Returns the empty `Ready` set. - pub(crate) const EMPTY: Ready = Ready(0); + pub const EMPTY: Ready = Ready(0); /// Returns a `Ready` representing readable readiness. - pub(crate) const READABLE: Ready = Ready(READABLE); + pub const READABLE: Ready = Ready(READABLE); /// Returns a `Ready` representing writable readiness. - pub(crate) const WRITABLE: Ready = Ready(WRITABLE); + pub const WRITABLE: Ready = Ready(WRITABLE); /// Returns a `Ready` representing read closed readiness. - pub(crate) const READ_CLOSED: Ready = Ready(READ_CLOSED); + pub const READ_CLOSED: Ready = Ready(READ_CLOSED); /// Returns a `Ready` representing write closed readiness. - pub(crate) const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED); + pub const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED); /// Returns a `Ready` representing readiness for all operations. - pub(crate) const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED); + pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED); + // Must remain crate-private to avoid adding a public dependency on Mio. pub(crate) fn from_mio(event: &mio::event::Event) -> Ready { let mut ready = Ready::EMPTY; @@ -59,27 +58,78 @@ impl Ready { } /// Returns true if `Ready` is the empty set - pub(crate) fn is_empty(self) -> bool { + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Ready; + /// + /// assert!(Ready::EMPTY.is_empty()); + /// assert!(!Ready::READABLE.is_empty()); + /// ``` + pub fn is_empty(self) -> bool { self == Ready::EMPTY } - /// Returns true if the value includes readable readiness - pub(crate) fn is_readable(self) -> bool { + /// Returns `true` if the value includes `readable` + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Ready; + /// + /// assert!(!Ready::EMPTY.is_readable()); + /// assert!(Ready::READABLE.is_readable()); + /// assert!(Ready::READ_CLOSED.is_readable()); + /// assert!(!Ready::WRITABLE.is_readable()); + /// ``` + pub fn is_readable(self) -> bool { self.contains(Ready::READABLE) || self.is_read_closed() } - /// Returns true if the value includes writable readiness - pub(crate) fn is_writable(self) -> bool { + /// Returns `true` if the value includes writable `readiness` + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Ready; + /// + /// assert!(!Ready::EMPTY.is_writable()); + /// assert!(!Ready::READABLE.is_writable()); + /// assert!(Ready::WRITABLE.is_writable()); + /// assert!(Ready::WRITE_CLOSED.is_writable()); + /// ``` + pub fn is_writable(self) -> bool { self.contains(Ready::WRITABLE) || self.is_write_closed() } - /// Returns true if the value includes read closed readiness - pub(crate) fn is_read_closed(self) -> bool { + /// Returns `true` if the value includes read-closed `readiness` + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Ready; + /// + /// assert!(!Ready::EMPTY.is_read_closed()); + /// assert!(!Ready::READABLE.is_read_closed()); + /// assert!(Ready::READ_CLOSED.is_read_closed()); + /// ``` + pub fn is_read_closed(self) -> bool { self.contains(Ready::READ_CLOSED) } - /// Returns true if the value includes write closed readiness - pub(crate) fn is_write_closed(self) -> bool { + /// Returns `true` if the value includes write-closed `readiness` + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Ready; + /// + /// assert!(!Ready::EMPTY.is_write_closed()); + /// assert!(!Ready::WRITABLE.is_write_closed()); + /// assert!(Ready::WRITE_CLOSED.is_write_closed()); + /// ``` + pub fn is_write_closed(self) -> bool { self.contains(Ready::WRITE_CLOSED) } @@ -143,37 +193,37 @@ cfg_io_readiness! { } } -impl> ops::BitOr for Ready { +impl ops::BitOr for Ready { type Output = Ready; #[inline] - fn bitor(self, other: T) -> Ready { - Ready(self.0 | other.into().0) + fn bitor(self, other: Ready) -> Ready { + Ready(self.0 | other.0) } } -impl> ops::BitOrAssign for Ready { +impl ops::BitOrAssign for Ready { #[inline] - fn bitor_assign(&mut self, other: T) { - self.0 |= other.into().0; + fn bitor_assign(&mut self, other: Ready) { + self.0 |= other.0; } } -impl> ops::BitAnd for Ready { +impl ops::BitAnd for Ready { type Output = Ready; #[inline] - fn bitand(self, other: T) -> Ready { - Ready(self.0 & other.into().0) + fn bitand(self, other: Ready) -> Ready { + Ready(self.0 & other.0) } } -impl> ops::Sub for Ready { +impl ops::Sub for Ready { type Output = Ready; #[inline] - fn sub(self, other: T) -> Ready { - Ready(self.0 & !other.into().0) + fn sub(self, other: Ready) -> Ready { + Ready(self.0 & !other.0) } } diff --git a/tokio/src/io/driver/registration.rs b/tokio/src/io/driver/registration.rs index db9afdd7..93125814 100644 --- a/tokio/src/io/driver/registration.rs +++ b/tokio/src/io/driver/registration.rs @@ -182,6 +182,27 @@ impl Registration { } } } + + pub(crate) fn try_io( + &self, + interest: Interest, + f: impl FnOnce() -> io::Result, + ) -> io::Result { + let ev = self.shared.ready_event(interest); + + // Don't attempt the operation if the resource is not ready. + if ev.ready.is_empty() { + return Err(io::ErrorKind::WouldBlock.into()); + } + + match f() { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.clear_readiness(ev); + Err(io::ErrorKind::WouldBlock.into()) + } + res => res, + } + } } fn gone() -> io::Error { diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index ed3adc39..75d56232 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -1,4 +1,4 @@ -use super::{Ready, ReadyEvent, Tick}; +use super::{Interest, Ready, ReadyEvent, Tick}; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Mutex; use crate::util::bit; @@ -49,8 +49,6 @@ struct Waiters { } cfg_io_readiness! { - use crate::io::Interest; - #[derive(Debug)] struct Waiter { pointers: linked_list::Pointers, @@ -280,6 +278,15 @@ impl ScheduledIo { } } + pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent { + let curr = self.readiness.load(Acquire); + + ReadyEvent { + tick: TICK.unpack(curr) as u8, + ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)), + } + } + /// Poll version of checking readiness for a certain direction. /// /// These are to support `AsyncRead` and `AsyncWrite` polling methods, diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 499633ee..14be3e06 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -204,9 +204,12 @@ pub use self::read_buf::ReadBuf; #[doc(no_inline)] pub use std::io::{Error, ErrorKind, Result, SeekFrom}; -cfg_io_driver! { +cfg_io_driver_impl! { pub(crate) mod driver; - pub(crate) use driver::Interest; + + cfg_net! { + pub use driver::{Interest, Ready}; + } mod poll_evented; diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index edf681a4..15216560 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -79,6 +79,19 @@ macro_rules! cfg_io_driver { } } +macro_rules! cfg_io_driver_impl { + ( $( $item:item )* ) => { + $( + #[cfg(any( + feature = "net", + feature = "process", + all(unix, feature = "signal"), + ))] + $item + )* + } +} + macro_rules! cfg_not_io_driver { ($($item:item)*) => { $( diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 0a784b5f..2ac37a2b 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -1,5 +1,5 @@ use crate::future::poll_fn; -use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf}; +use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready}; use crate::net::tcp::split::{split, ReadHalf, WriteHalf}; use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf}; use crate::net::{to_socket_addrs, ToSocketAddrs}; @@ -264,6 +264,266 @@ impl TcpStream { } } + /// Wait for any of the requested ready states. + /// + /// This function is usually paired with `try_read()` or `try_write()`. It + /// can be used to concurrently read / write to the same socket on a single + /// task without splitting the socket. + /// + /// # Examples + /// + /// Concurrently read and write to the stream on the same task without + /// splitting. + /// + /// ```no_run + /// use tokio::io::Interest; + /// use tokio::net::TcpStream; + /// use std::error::Error; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// loop { + /// let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?; + /// + /// if ready.is_readable() { + /// // The buffer is **not** included in the async task and will only exist + /// // on the stack. + /// let mut data = [0; 1024]; + /// let n = stream.try_read(&mut data[..]).unwrap(); + /// + /// println!("GOT {:?}", &data[..n]); + /// } + /// + /// if ready.is_writable() { + /// // Write some data + /// stream.try_write(b"hello world").unwrap(); + /// } + /// } + /// } + /// ``` + pub async fn ready(&self, interest: Interest) -> io::Result { + let event = self.io.registration().readiness(interest).await?; + Ok(event.ready) + } + + /// Wait for the socket to become readable. + /// + /// This function is equivalent to `ready(Interest::READABLE)` is usually + /// paired with `try_read()`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use std::error::Error; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// // Connect to a peer + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// let mut msg = vec![0; 1024]; + /// + /// loop { + /// // Wait for the socket to be readable + /// stream.readable().await?; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_read(&mut msg) { + /// Ok(n) => { + /// msg.truncate(n); + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// println!("GOT = {:?}", msg); + /// Ok(()) + /// } + /// ``` + pub async fn readable(&self) -> io::Result<()> { + self.ready(Interest::READABLE).await?; + Ok(()) + } + + /// Try to read data from the stream into the provided buffer, returning how + /// many bytes were read. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: TcpStream::readable() + /// [`ready()`]: TcpStream::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(n)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKinid::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use std::error::Error; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// // Connect to a peer + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// loop { + /// // Wait for the socket to be readable + /// stream.readable().await?; + /// + /// // Creating the buffer **after** the `await` prevents it from + /// // being stored in the async task. + /// let mut buf = [0; 4096]; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_read(&mut buf) { + /// Ok(0) => break, + /// Ok(n) => { + /// println!("read {} bytes", n); + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_read(&self, buf: &mut [u8]) -> io::Result { + use std::io::Read; + + self.io + .registration() + .try_io(Interest::READABLE, || (&*self.io).read(buf)) + } + + /// Wait for the socket to become writable. + /// + /// This function is equivalent to `ready(Interest::WRITABLE)` is usually + /// paired with `try_write()`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use std::error::Error; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// // Connect to a peer + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// loop { + /// // Wait for the socket to be writable + /// stream.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_write(b"hello world") { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub async fn writable(&self) -> io::Result<()> { + self.ready(Interest::WRITABLE).await?; + Ok(()) + } + + /// Try to write a buffer to the stream, returning how many bytes were + /// written. + /// + /// The function will attempt to write the entire contents of `buf`, but + /// only part of the buffer may be written. + /// + /// This function is equivalent to `ready(Interest::WRITABLE)` is usually + /// paired with `try_write()`. + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the stream is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use std::error::Error; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// // Connect to a peer + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// loop { + /// // Wait for the socket to be writable + /// stream.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_write(b"hello world") { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_write(&self, buf: &[u8]) -> io::Result { + use std::io::Write; + + self.io + .registration() + .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) + } + /// Receives data on the socket from the remote address to which it is /// connected, without removing that data from the queue. On success, /// returns the number of bytes peeked. diff --git a/tokio/src/signal/unix/driver.rs b/tokio/src/signal/unix/driver.rs index 323bb9df..315f3bd5 100644 --- a/tokio/src/signal/unix/driver.rs +++ b/tokio/src/signal/unix/driver.rs @@ -2,8 +2,8 @@ //! Signal driver -use crate::io::driver::Driver as IoDriver; -use crate::io::{Interest, PollEvented}; +use crate::io::driver::{Driver as IoDriver, Interest}; +use crate::io::PollEvented; use crate::park::Park; use crate::signal::registry::globals; diff --git a/tokio/tests/tcp_stream.rs b/tokio/tests/tcp_stream.rs new file mode 100644 index 00000000..784ade8a --- /dev/null +++ b/tokio/tests/tcp_stream.rs @@ -0,0 +1,112 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::Interest; +use tokio::net::{TcpListener, TcpStream}; +use tokio_test::task; +use tokio_test::{assert_pending, assert_ready_ok}; + +use std::io; + +#[tokio::test] +async fn try_read_write() { + const DATA: &[u8] = b"this is some data to write to the socket"; + + // Create listener + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + + // Create socket pair + let client = TcpStream::connect(listener.local_addr().unwrap()) + .await + .unwrap(); + let (server, _) = listener.accept().await.unwrap(); + let mut written = DATA.to_vec(); + + // Track the server receiving data + let mut readable = task::spawn(server.readable()); + assert_pending!(readable.poll()); + + // Write data. + client.writable().await.unwrap(); + assert_eq!(DATA.len(), client.try_write(DATA).unwrap()); + + // The task should be notified + while !readable.is_woken() { + tokio::task::yield_now().await; + } + + // Fill the write buffer + loop { + // Still ready + let mut writable = task::spawn(client.writable()); + assert_ready_ok!(writable.poll()); + + match client.try_write(DATA) { + Ok(n) => written.extend(&DATA[..n]), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + break; + } + Err(e) => panic!("error = {:?}", e), + } + } + + { + // Write buffer full + let mut writable = task::spawn(client.writable()); + assert_pending!(writable.poll()); + + // Drain the socket from the server end + let mut read = vec![0; written.len()]; + let mut i = 0; + + while i < read.len() { + server.readable().await.unwrap(); + + match server.try_read(&mut read[i..]) { + Ok(n) => i += n, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("error = {:?}", e), + } + } + + assert_eq!(read, written); + } + + // Now, we listen for shutdown + drop(client); + + loop { + let ready = server.ready(Interest::READABLE).await.unwrap(); + + if ready.is_read_closed() { + return; + } else { + tokio::task::yield_now().await; + } + } +} + +#[test] +fn buffer_not_included_in_future() { + use std::mem; + + const N: usize = 4096; + + let fut = async { + let stream = TcpStream::connect("127.0.0.1:8080").await.unwrap(); + + loop { + stream.readable().await.unwrap(); + + let mut buf = [0; N]; + let n = stream.try_read(&mut buf[..]).unwrap(); + + if n == 0 { + break; + } + } + }; + + let n = mem::size_of_val(&fut); + assert!(n < 1000); +} -- cgit v1.2.3