summaryrefslogtreecommitdiffstats
path: root/tokio/src/net
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-11-12 20:07:43 -0800
committerGitHub <noreply@github.com>2020-11-12 20:07:43 -0800
commit02b1117dca1c1e1fcc700bff4d6a93c33bfbc7d8 (patch)
tree95788ea2d89de4af6021befa70a3f5d80034578a /tokio/src/net
parent685da8dadd8821d1053ce7ecaf01ab5ee231bef9 (diff)
net: add TcpStream::ready and non-blocking ops (#3130)
Adds function to await for readiness on the TcpStream and non-blocking read/write functions. `async fn TcpStream::ready(Interest)` waits for socket readiness satisfying **any** of the specified interest. There are also two shorthand functions, `readable()` and `writable()`. Once the stream is in a ready state, the caller may perform non-blocking operations on it using `try_read()` and `try_write()`. These function return `WouldBlock` if the stream is not, in fact, ready. The await readiness function are similar to `AsyncFd`, but do not require a guard. The guard in `AsyncFd` protect against a potential race between receiving the readiness notification and clearing it. The guard is needed as Tokio does not control the operations. With `TcpStream`, the `try_read()` and `try_write()` function handle clearing stream readiness as needed. This also exposes `Interest` and `Ready`, both defined in Tokio as wrappers for Mio types. These types will also be useful for fixing #3072 . Other I/O types, such as `TcpListener`, `UdpSocket`, `Unix*` should get similar functions, but this is left for later PRs. Refs: #3130
Diffstat (limited to 'tokio/src/net')
-rw-r--r--tokio/src/net/tcp/stream.rs262
1 files changed, 261 insertions, 1 deletions
diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs
index 0a784b5f..2ac37a2b 100644
--- a/tokio/src/net/tcp/stream.rs
+++ b/tokio/src/net/tcp/stream.rs
@@ -1,5 +1,5 @@
use crate::future::poll_fn;
-use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf};
+use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
use crate::net::{to_socket_addrs, ToSocketAddrs};
@@ -264,6 +264,266 @@ impl TcpStream {
}
}
+ /// Wait for any of the requested ready states.
+ ///
+ /// This function is usually paired with `try_read()` or `try_write()`. It
+ /// can be used to concurrently read / write to the same socket on a single
+ /// task without splitting the socket.
+ ///
+ /// # Examples
+ ///
+ /// Concurrently read and write to the stream on the same task without
+ /// splitting.
+ ///
+ /// ```no_run
+ /// use tokio::io::Interest;
+ /// use tokio::net::TcpStream;
+ /// use std::error::Error;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// loop {
+ /// let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
+ ///
+ /// if ready.is_readable() {
+ /// // The buffer is **not** included in the async task and will only exist
+ /// // on the stack.
+ /// let mut data = [0; 1024];
+ /// let n = stream.try_read(&mut data[..]).unwrap();
+ ///
+ /// println!("GOT {:?}", &data[..n]);
+ /// }
+ ///
+ /// if ready.is_writable() {
+ /// // Write some data
+ /// stream.try_write(b"hello world").unwrap();
+ /// }
+ /// }
+ /// }
+ /// ```
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ let event = self.io.registration().readiness(interest).await?;
+ Ok(event.ready)
+ }
+
+ /// Wait for the socket to become readable.
+ ///
+ /// This function is equivalent to `ready(Interest::READABLE)` is usually
+ /// paired with `try_read()`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpStream;
+ /// use std::error::Error;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// let mut msg = vec![0; 1024];
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// stream.readable().await?;
+ ///
+ /// // Try to read data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_read(&mut msg) {
+ /// Ok(n) => {
+ /// msg.truncate(n);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// println!("GOT = {:?}", msg);
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn readable(&self) -> io::Result<()> {
+ self.ready(Interest::READABLE).await?;
+ Ok(())
+ }
+
+ /// Try to read data from the stream into the provided buffer, returning how
+ /// many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: TcpStream::readable()
+ /// [`ready()`]: TcpStream::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(n)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKinid::WouldBlock)` is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpStream;
+ /// use std::error::Error;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// stream.readable().await?;
+ ///
+ /// // Creating the buffer **after** the `await` prevents it from
+ /// // being stored in the async task.
+ /// let mut buf = [0; 4096];
+ ///
+ /// // Try to read data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_read(&mut buf) {
+ /// Ok(0) => break,
+ /// Ok(n) => {
+ /// println!("read {} bytes", n);
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
+ use std::io::Read;
+
+ self.io
+ .registration()
+ .try_io(Interest::READABLE, || (&*self.io).read(buf))
+ }
+
+ /// Wait for the socket to become writable.
+ ///
+ /// This function is equivalent to `ready(Interest::WRITABLE)` is usually
+ /// paired with `try_write()`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpStream;
+ /// use std::error::Error;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be writable
+ /// stream.writable().await?;
+ ///
+ /// // Try to write data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_write(b"hello world") {
+ /// Ok(n) => {
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn writable(&self) -> io::Result<()> {
+ self.ready(Interest::WRITABLE).await?;
+ Ok(())
+ }
+
+ /// Try to write a buffer to the stream, returning how many bytes were
+ /// written.
+ ///
+ /// The function will attempt to write the entire contents of `buf`, but
+ /// only part of the buffer may be written.
+ ///
+ /// This function is equivalent to `ready(Interest::WRITABLE)` is usually
+ /// paired with `try_write()`.
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpStream;
+ /// use std::error::Error;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be writable
+ /// stream.writable().await?;
+ ///
+ /// // Try to write data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_write(b"hello world") {
+ /// Ok(n) => {
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
+ use std::io::Write;
+
+ self.io
+ .registration()
+ .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
+ }
+
/// Receives data on the socket from the remote address to which it is
/// connected, without removing that data from the queue. On success,
/// returns the number of bytes peeked.