diff options
author | Carl Lerche <me@carllerche.com> | 2020-11-12 20:07:43 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-12 20:07:43 -0800 |
commit | 02b1117dca1c1e1fcc700bff4d6a93c33bfbc7d8 (patch) | |
tree | 95788ea2d89de4af6021befa70a3f5d80034578a /tokio/src/net | |
parent | 685da8dadd8821d1053ce7ecaf01ab5ee231bef9 (diff) |
net: add TcpStream::ready and non-blocking ops (#3130)
Adds function to await for readiness on the TcpStream and non-blocking read/write functions.
`async fn TcpStream::ready(Interest)` waits for socket readiness satisfying **any** of the specified
interest. There are also two shorthand functions, `readable()` and `writable()`.
Once the stream is in a ready state, the caller may perform non-blocking operations on it using
`try_read()` and `try_write()`. These function return `WouldBlock` if the stream is not, in fact, ready.
The await readiness function are similar to `AsyncFd`, but do not require a guard. The guard in
`AsyncFd` protect against a potential race between receiving the readiness notification and clearing
it. The guard is needed as Tokio does not control the operations. With `TcpStream`, the `try_read()`
and `try_write()` function handle clearing stream readiness as needed.
This also exposes `Interest` and `Ready`, both defined in Tokio as wrappers for Mio types. These
types will also be useful for fixing #3072 .
Other I/O types, such as `TcpListener`, `UdpSocket`, `Unix*` should get similar functions, but this
is left for later PRs.
Refs: #3130
Diffstat (limited to 'tokio/src/net')
-rw-r--r-- | tokio/src/net/tcp/stream.rs | 262 |
1 files changed, 261 insertions, 1 deletions
diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 0a784b5f..2ac37a2b 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -1,5 +1,5 @@ use crate::future::poll_fn; -use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf}; +use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready}; use crate::net::tcp::split::{split, ReadHalf, WriteHalf}; use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf}; use crate::net::{to_socket_addrs, ToSocketAddrs}; @@ -264,6 +264,266 @@ impl TcpStream { } } + /// Wait for any of the requested ready states. + /// + /// This function is usually paired with `try_read()` or `try_write()`. It + /// can be used to concurrently read / write to the same socket on a single + /// task without splitting the socket. + /// + /// # Examples + /// + /// Concurrently read and write to the stream on the same task without + /// splitting. + /// + /// ```no_run + /// use tokio::io::Interest; + /// use tokio::net::TcpStream; + /// use std::error::Error; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// loop { + /// let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?; + /// + /// if ready.is_readable() { + /// // The buffer is **not** included in the async task and will only exist + /// // on the stack. + /// let mut data = [0; 1024]; + /// let n = stream.try_read(&mut data[..]).unwrap(); + /// + /// println!("GOT {:?}", &data[..n]); + /// } + /// + /// if ready.is_writable() { + /// // Write some data + /// stream.try_write(b"hello world").unwrap(); + /// } + /// } + /// } + /// ``` + pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { + let event = self.io.registration().readiness(interest).await?; + Ok(event.ready) + } + + /// Wait for the socket to become readable. + /// + /// This function is equivalent to `ready(Interest::READABLE)` is usually + /// paired with `try_read()`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use std::error::Error; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn Error>> { + /// // Connect to a peer + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// let mut msg = vec![0; 1024]; + /// + /// loop { + /// // Wait for the socket to be readable + /// stream.readable().await?; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_read(&mut msg) { + /// Ok(n) => { + /// msg.truncate(n); + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// println!("GOT = {:?}", msg); + /// Ok(()) + /// } + /// ``` + pub async fn readable(&self) -> io::Result<()> { + self.ready(Interest::READABLE).await?; + Ok(()) + } + + /// Try to read data from the stream into the provided buffer, returning how + /// many bytes were read. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: TcpStream::readable() + /// [`ready()`]: TcpStream::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(n)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKinid::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use std::error::Error; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn Error>> { + /// // Connect to a peer + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// loop { + /// // Wait for the socket to be readable + /// stream.readable().await?; + /// + /// // Creating the buffer **after** the `await` prevents it from + /// // being stored in the async task. + /// let mut buf = [0; 4096]; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_read(&mut buf) { + /// Ok(0) => break, + /// Ok(n) => { + /// println!("read {} bytes", n); + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> { + use std::io::Read; + + self.io + .registration() + .try_io(Interest::READABLE, || (&*self.io).read(buf)) + } + + /// Wait for the socket to become writable. + /// + /// This function is equivalent to `ready(Interest::WRITABLE)` is usually + /// paired with `try_write()`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use std::error::Error; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn Error>> { + /// // Connect to a peer + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// loop { + /// // Wait for the socket to be writable + /// stream.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_write(b"hello world") { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub async fn writable(&self) -> io::Result<()> { + self.ready(Interest::WRITABLE).await?; + Ok(()) + } + + /// Try to write a buffer to the stream, returning how many bytes were + /// written. + /// + /// The function will attempt to write the entire contents of `buf`, but + /// only part of the buffer may be written. + /// + /// This function is equivalent to `ready(Interest::WRITABLE)` is usually + /// paired with `try_write()`. + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the stream is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use std::error::Error; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn Error>> { + /// // Connect to a peer + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// loop { + /// // Wait for the socket to be writable + /// stream.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_write(b"hello world") { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> { + use std::io::Write; + + self.io + .registration() + .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) + } + /// Receives data on the socket from the remote address to which it is /// connected, without removing that data from the queue. On success, /// returns the number of bytes peeked. |