summaryrefslogtreecommitdiffstats
path: root/tokio/src/net/tcp/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/net/tcp/stream.rs')
-rw-r--r--tokio/src/net/tcp/stream.rs262
1 files changed, 261 insertions, 1 deletions
diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs
index 0a784b5f..2ac37a2b 100644
--- a/tokio/src/net/tcp/stream.rs
+++ b/tokio/src/net/tcp/stream.rs
@@ -1,5 +1,5 @@
use crate::future::poll_fn;
-use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf};
+use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
use crate::net::{to_socket_addrs, ToSocketAddrs};
@@ -264,6 +264,266 @@ impl TcpStream {
}
}
+ /// Wait for any of the requested ready states.
+ ///
+ /// This function is usually paired with `try_read()` or `try_write()`. It
+ /// can be used to concurrently read / write to the same socket on a single
+ /// task without splitting the socket.
+ ///
+ /// # Examples
+ ///
+ /// Concurrently read and write to the stream on the same task without
+ /// splitting.
+ ///
+ /// ```no_run
+ /// use tokio::io::Interest;
+ /// use tokio::net::TcpStream;
+ /// use std::error::Error;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// loop {
+ /// let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
+ ///
+ /// if ready.is_readable() {
+ /// // The buffer is **not** included in the async task and will only exist
+ /// // on the stack.
+ /// let mut data = [0; 1024];
+ /// let n = stream.try_read(&mut data[..]).unwrap();
+ ///
+ /// println!("GOT {:?}", &data[..n]);
+ /// }
+ ///
+ /// if ready.is_writable() {
+ /// // Write some data
+ /// stream.try_write(b"hello world").unwrap();
+ /// }
+ /// }
+ /// }
+ /// ```
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ let event = self.io.registration().readiness(interest).await?;
+ Ok(event.ready)
+ }
+
+ /// Wait for the socket to become readable.
+ ///
+ /// This function is equivalent to `ready(Interest::READABLE)` is usually
+ /// paired with `try_read()`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpStream;
+ /// use std::error::Error;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// let mut msg = vec![0; 1024];
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// stream.readable().await?;
+ ///
+ /// // Try to read data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_read(&mut msg) {
+ /// Ok(n) => {
+ /// msg.truncate(n);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// println!("GOT = {:?}", msg);
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn readable(&self) -> io::Result<()> {
+ self.ready(Interest::READABLE).await?;
+ Ok(())
+ }
+
+ /// Try to read data from the stream into the provided buffer, returning how
+ /// many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: TcpStream::readable()
+ /// [`ready()`]: TcpStream::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(n)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKinid::WouldBlock)` is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpStream;
+ /// use std::error::Error;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// stream.readable().await?;
+ ///
+ /// // Creating the buffer **after** the `await` prevents it from
+ /// // being stored in the async task.
+ /// let mut buf = [0; 4096];
+ ///
+ /// // Try to read data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_read(&mut buf) {
+ /// Ok(0) => break,
+ /// Ok(n) => {
+ /// println!("read {} bytes", n);
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
+ use std::io::Read;
+
+ self.io
+ .registration()
+ .try_io(Interest::READABLE, || (&*self.io).read(buf))
+ }
+
+ /// Wait for the socket to become writable.
+ ///
+ /// This function is equivalent to `ready(Interest::WRITABLE)` is usually
+ /// paired with `try_write()`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpStream;
+ /// use std::error::Error;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be writable
+ /// stream.writable().await?;
+ ///
+ /// // Try to write data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_write(b"hello world") {
+ /// Ok(n) => {
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn writable(&self) -> io::Result<()> {
+ self.ready(Interest::WRITABLE).await?;
+ Ok(())
+ }
+
+ /// Try to write a buffer to the stream, returning how many bytes were
+ /// written.
+ ///
+ /// The function will attempt to write the entire contents of `buf`, but
+ /// only part of the buffer may be written.
+ ///
+ /// This function is equivalent to `ready(Interest::WRITABLE)` is usually
+ /// paired with `try_write()`.
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpStream;
+ /// use std::error::Error;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be writable
+ /// stream.writable().await?;
+ ///
+ /// // Try to write data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_write(b"hello world") {
+ /// Ok(n) => {
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
+ use std::io::Write;
+
+ self.io
+ .registration()
+ .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
+ }
+
/// Receives data on the socket from the remote address to which it is
/// connected, without removing that data from the queue. On success,
/// returns the number of bytes peeked.