summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-11-12 20:07:43 -0800
committerGitHub <noreply@github.com>2020-11-12 20:07:43 -0800
commit02b1117dca1c1e1fcc700bff4d6a93c33bfbc7d8 (patch)
tree95788ea2d89de4af6021befa70a3f5d80034578a /tokio
parent685da8dadd8821d1053ce7ecaf01ab5ee231bef9 (diff)
net: add TcpStream::ready and non-blocking ops (#3130)
Adds function to await for readiness on the TcpStream and non-blocking read/write functions. `async fn TcpStream::ready(Interest)` waits for socket readiness satisfying **any** of the specified interest. There are also two shorthand functions, `readable()` and `writable()`. Once the stream is in a ready state, the caller may perform non-blocking operations on it using `try_read()` and `try_write()`. These function return `WouldBlock` if the stream is not, in fact, ready. The await readiness function are similar to `AsyncFd`, but do not require a guard. The guard in `AsyncFd` protect against a potential race between receiving the readiness notification and clearing it. The guard is needed as Tokio does not control the operations. With `TcpStream`, the `try_read()` and `try_write()` function handle clearing stream readiness as needed. This also exposes `Interest` and `Ready`, both defined in Tokio as wrappers for Mio types. These types will also be useful for fixing #3072 . Other I/O types, such as `TcpListener`, `UdpSocket`, `Unix*` should get similar functions, but this is left for later PRs. Refs: #3130
Diffstat (limited to 'tokio')
-rw-r--r--tokio/src/io/driver/interest.rs70
-rw-r--r--tokio/src/io/driver/mod.rs8
-rw-r--r--tokio/src/io/driver/ready.rs120
-rw-r--r--tokio/src/io/driver/registration.rs21
-rw-r--r--tokio/src/io/driver/scheduled_io.rs13
-rw-r--r--tokio/src/io/mod.rs7
-rw-r--r--tokio/src/macros/cfg.rs13
-rw-r--r--tokio/src/net/tcp/stream.rs262
-rw-r--r--tokio/src/signal/unix/driver.rs4
-rw-r--r--tokio/tests/tcp_stream.rs112
10 files changed, 576 insertions, 54 deletions
diff --git a/tokio/src/io/driver/interest.rs b/tokio/src/io/driver/interest.rs
index f9887e86..8c8049df 100644
--- a/tokio/src/io/driver/interest.rs
+++ b/tokio/src/io/driver/interest.rs
@@ -1,3 +1,7 @@
+#![cfg_attr(not(feature = "net"), allow(unreachable_pub))]
+
+use crate::io::driver::Ready;
+
use std::fmt;
use std::ops;
@@ -5,34 +9,84 @@ use std::ops;
///
/// Specifies the readiness events the caller is interested in when awaiting on
/// I/O resource readiness states.
-#[derive(Clone, Copy)]
-pub(crate) struct Interest(mio::Interest);
+#[cfg_attr(docsrs, doc(cfg(feature = "net")))]
+#[derive(Clone, Copy, Eq, PartialEq)]
+pub struct Interest(mio::Interest);
impl Interest {
- /// Interest in all readable events
- pub(crate) const READABLE: Interest = Interest(mio::Interest::READABLE);
+ /// Interest in all readable events.
+ ///
+ /// Readable interest includes read-closed events.
+ pub const READABLE: Interest = Interest(mio::Interest::READABLE);
/// Interest in all writable events
- pub(crate) const WRITABLE: Interest = Interest(mio::Interest::WRITABLE);
+ ///
+ /// Writable interest includes write-closed events.
+ pub const WRITABLE: Interest = Interest(mio::Interest::WRITABLE);
/// Returns true if the value includes readable interest.
- pub(crate) const fn is_readable(self) -> bool {
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::io::Interest;
+ ///
+ /// assert!(Interest::READABLE.is_readable());
+ /// assert!(!Interest::WRITABLE.is_readable());
+ ///
+ /// let both = Interest::READABLE | Interest::WRITABLE;
+ /// assert!(both.is_readable());
+ /// ```
+ pub const fn is_readable(self) -> bool {
self.0.is_readable()
}
/// Returns true if the value includes writable interest.
- pub(crate) const fn is_writable(self) -> bool {
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::io::Interest;
+ ///
+ /// assert!(!Interest::READABLE.is_writable());
+ /// assert!(Interest::WRITABLE.is_writable());
+ ///
+ /// let both = Interest::READABLE | Interest::WRITABLE;
+ /// assert!(both.is_writable());
+ /// ```
+ pub const fn is_writable(self) -> bool {
self.0.is_writable()
}
/// Add together two `Interst` values.
- pub(crate) const fn add(self, other: Interest) -> Interest {
+ ///
+ /// This function works from a `const` context.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::io::Interest;
+ ///
+ /// const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
+ ///
+ /// assert!(BOTH.is_readable());
+ /// assert!(BOTH.is_writable());
+ pub const fn add(self, other: Interest) -> Interest {
Interest(self.0.add(other.0))
}
+ // This function must be crate-private to avoid exposing a `mio` dependency.
pub(crate) const fn to_mio(self) -> mio::Interest {
self.0
}
+
+ pub(super) fn mask(self) -> Ready {
+ match self {
+ Interest::READABLE => Ready::READABLE | Ready::READ_CLOSED,
+ Interest::WRITABLE => Ready::WRITABLE | Ready::WRITE_CLOSED,
+ _ => Ready::EMPTY,
+ }
+ }
}
impl ops::BitOr for Interest {
diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs
index c494db41..a1784dff 100644
--- a/tokio/src/io/driver/mod.rs
+++ b/tokio/src/io/driver/mod.rs
@@ -1,10 +1,12 @@
#![cfg_attr(not(feature = "rt"), allow(dead_code))]
mod interest;
-pub(crate) use interest::Interest;
+#[allow(unreachable_pub)]
+pub use interest::Interest;
mod ready;
-use ready::Ready;
+#[allow(unreachable_pub)]
+pub use ready::Ready;
mod registration;
pub(crate) use registration::Registration;
@@ -51,7 +53,7 @@ pub(crate) struct Handle {
pub(crate) struct ReadyEvent {
tick: u8,
- ready: Ready,
+ pub(crate) ready: Ready,
}
pub(super) struct Inner {
diff --git a/tokio/src/io/driver/ready.rs b/tokio/src/io/driver/ready.rs
index 2790cc13..2ac01bdb 100644
--- a/tokio/src/io/driver/ready.rs
+++ b/tokio/src/io/driver/ready.rs
@@ -1,3 +1,5 @@
+#![cfg_attr(not(feature = "net"), allow(unreachable_pub))]
+
use std::fmt;
use std::ops;
@@ -6,36 +8,33 @@ const WRITABLE: usize = 0b0_10;
const READ_CLOSED: usize = 0b0_0100;
const WRITE_CLOSED: usize = 0b0_1000;
-/// A set of readiness event kinds.
-///
-/// `Ready` is set of operation descriptors indicating which kind of an
-/// operation is ready to be performed.
+/// Describes the readiness state of an I/O resources.
///
-/// This struct only represents portable event kinds. Portable events are
-/// events that can be raised on any platform while guaranteeing no false
-/// positives.
+/// `Ready` tracks which operation an I/O resource is ready to perform.
+#[cfg_attr(docsrs, doc(cfg(feature = "net")))]
#[derive(Clone, Copy, PartialEq, PartialOrd)]
-pub(crate) struct Ready(usize);
+pub struct Ready(usize);
impl Ready {
/// Returns the empty `Ready` set.
- pub(crate) const EMPTY: Ready = Ready(0);
+ pub const EMPTY: Ready = Ready(0);
/// Returns a `Ready` representing readable readiness.
- pub(crate) const READABLE: Ready = Ready(READABLE);
+ pub const READABLE: Ready = Ready(READABLE);
/// Returns a `Ready` representing writable readiness.
- pub(crate) const WRITABLE: Ready = Ready(WRITABLE);
+ pub const WRITABLE: Ready = Ready(WRITABLE);
/// Returns a `Ready` representing read closed readiness.
- pub(crate) const READ_CLOSED: Ready = Ready(READ_CLOSED);
+ pub const READ_CLOSED: Ready = Ready(READ_CLOSED);
/// Returns a `Ready` representing write closed readiness.
- pub(crate) const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED);
+ pub const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED);
/// Returns a `Ready` representing readiness for all operations.
- pub(crate) const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED);
+ pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED);
+ // Must remain crate-private to avoid adding a public dependency on Mio.
pub(crate) fn from_mio(event: &mio::event::Event) -> Ready {
let mut ready = Ready::EMPTY;
@@ -59,27 +58,78 @@ impl Ready {
}
/// Returns true if `Ready` is the empty set
- pub(crate) fn is_empty(self) -> bool {
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::io::Ready;
+ ///
+ /// assert!(Ready::EMPTY.is_empty());
+ /// assert!(!Ready::READABLE.is_empty());
+ /// ```
+ pub fn is_empty(self) -> bool {
self == Ready::EMPTY
}
- /// Returns true if the value includes readable readiness
- pub(crate) fn is_readable(self) -> bool {
+ /// Returns `true` if the value includes `readable`
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::io::Ready;
+ ///
+ /// assert!(!Ready::EMPTY.is_readable());
+ /// assert!(Ready::READABLE.is_readable());
+ /// assert!(Ready::READ_CLOSED.is_readable());
+ /// assert!(!Ready::WRITABLE.is_readable());
+ /// ```
+ pub fn is_readable(self) -> bool {
self.contains(Ready::READABLE) || self.is_read_closed()
}
- /// Returns true if the value includes writable readiness
- pub(crate) fn is_writable(self) -> bool {
+ /// Returns `true` if the value includes writable `readiness`
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::io::Ready;
+ ///
+ /// assert!(!Ready::EMPTY.is_writable());
+ /// assert!(!Ready::READABLE.is_writable());
+ /// assert!(Ready::WRITABLE.is_writable());
+ /// assert!(Ready::WRITE_CLOSED.is_writable());
+ /// ```
+ pub fn is_writable(self) -> bool {
self.contains(Ready::WRITABLE) || self.is_write_closed()
}
- /// Returns true if the value includes read closed readiness
- pub(crate) fn is_read_closed(self) -> bool {
+ /// Returns `true` if the value includes read-closed `readiness`
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::io::Ready;
+ ///
+ /// assert!(!Ready::EMPTY.is_read_closed());
+ /// assert!(!Ready::READABLE.is_read_closed());
+ /// assert!(Ready::READ_CLOSED.is_read_closed());
+ /// ```
+ pub fn is_read_closed(self) -> bool {
self.contains(Ready::READ_CLOSED)
}
- /// Returns true if the value includes write closed readiness
- pub(crate) fn is_write_closed(self) -> bool {
+ /// Returns `true` if the value includes write-closed `readiness`
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::io::Ready;
+ ///
+ /// assert!(!Ready::EMPTY.is_write_closed());
+ /// assert!(!Ready::WRITABLE.is_write_closed());
+ /// assert!(Ready::WRITE_CLOSED.is_write_closed());
+ /// ```
+ pub fn is_write_closed(self) -> bool {
self.contains(Ready::WRITE_CLOSED)
}
@@ -143,37 +193,37 @@ cfg_io_readiness! {
}
}
-impl<T: Into<Ready>> ops::BitOr<T> for Ready {
+impl ops::BitOr<Ready> for Ready {
type Output = Ready;
#[inline]
- fn bitor(self, other: T) -> Ready {
- Ready(self.0 | other.into().0)
+ fn bitor(self, other: Ready) -> Ready {
+ Ready(self.0 | other.0)
}
}
-impl<T: Into<Ready>> ops::BitOrAssign<T> for Ready {
+impl ops::BitOrAssign<Ready> for Ready {
#[inline]
- fn bitor_assign(&mut self, other: T) {
- self.0 |= other.into().0;
+ fn bitor_assign(&mut self, other: Ready) {
+ self.0 |= other.0;
}
}
-impl<T: Into<Ready>> ops::BitAnd<T> for Ready {
+impl ops::BitAnd<Ready> for Ready {
type Output = Ready;
#[inline]
- fn bitand(self, other: T) -> Ready {
- Ready(self.0 & other.into().0)
+ fn bitand(self, other: Ready) -> Ready {
+ Ready(self.0 & other.0)
}
}
-impl<T: Into<Ready>> ops::Sub<T> for Ready {
+impl ops::Sub<Ready> for Ready {
type Output = Ready;
#[inline]
- fn sub(self, other: T) -> Ready {
- Ready(self.0 & !other.into().0)
+ fn sub(self, other: Ready) -> Ready {
+ Ready(self.0 & !other.0)
}
}
diff --git a/tokio/src/io/driver/registration.rs b/tokio/src/io/driver/registration.rs
index db9afdd7..93125814 100644
--- a/tokio/src/io/driver/registration.rs
+++ b/tokio/src/io/driver/registration.rs
@@ -182,6 +182,27 @@ impl Registration {
}
}
}
+
+ pub(crate) fn try_io<R>(
+ &self,
+ interest: Interest,
+ f: impl FnOnce() -> io::Result<R>,
+ ) -> io::Result<R> {
+ let ev = self.shared.ready_event(interest);
+
+ // Don't attempt the operation if the resource is not ready.
+ if ev.ready.is_empty() {
+ return Err(io::ErrorKind::WouldBlock.into());
+ }
+
+ match f() {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.clear_readiness(ev);
+ Err(io::ErrorKind::WouldBlock.into())
+ }
+ res => res,
+ }
+ }
}
fn gone() -> io::Error {
diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs
index ed3adc39..75d56232 100644
--- a/tokio/src/io/driver/scheduled_io.rs
+++ b/tokio/src/io/driver/scheduled_io.rs
@@ -1,4 +1,4 @@
-use super::{Ready, ReadyEvent, Tick};
+use super::{Interest, Ready, ReadyEvent, Tick};
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::util::bit;
@@ -49,8 +49,6 @@ struct Waiters {
}
cfg_io_readiness! {
- use crate::io::Interest;
-
#[derive(Debug)]
struct Waiter {
pointers: linked_list::Pointers<Waiter>,
@@ -280,6 +278,15 @@ impl ScheduledIo {
}
}
+ pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent {
+ let curr = self.readiness.load(Acquire);
+
+ ReadyEvent {
+ tick: TICK.unpack(curr) as u8,
+ ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)),
+ }
+ }
+
/// Poll version of checking readiness for a certain direction.
///
/// These are to support `AsyncRead` and `AsyncWrite` polling methods,
diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs
index 499633ee..14be3e06 100644
--- a/tokio/src/io/mod.rs
+++ b/tokio/src/io/mod.rs
@@ -204,9 +204,12 @@ pub use self::read_buf::ReadBuf;
#[doc(no_inline)]
pub use std::io::{Error, ErrorKind, Result, SeekFrom};
-cfg_io_driver! {
+cfg_io_driver_impl! {
pub(crate) mod driver;
- pub(crate) use driver::Interest;
+
+ cfg_net! {
+ pub use driver::{Interest, Ready};
+ }
mod poll_evented;
diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs
index edf681a4..15216560 100644
--- a/tokio/src/macros/cfg.rs
+++ b/tokio/src/macros/cfg.rs
@@ -79,6 +79,19 @@ macro_rules! cfg_io_driver {
}
}
+macro_rules! cfg_io_driver_impl {
+ ( $( $item:item )* ) => {
+ $(
+ #[cfg(any(
+ feature = "net",
+ feature = "process",
+ all(unix, feature = "signal"),
+ ))]
+ $item
+ )*
+ }
+}
+
macro_rules! cfg_not_io_driver {
($($item:item)*) => {
$(
diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs
index 0a784b5f..2ac37a2b 100644
--- a/tokio/src/net/tcp/stream.rs
+++ b/tokio/src/net/tcp/stream.rs
@@ -1,5 +1,5 @@
use crate::future::poll_fn;
-use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf};
+use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
use crate::net::{to_socket_addrs, ToSocketAddrs};
@@ -264,6 +264,266 @@ impl TcpStream {
}
}
+ /// Wait for any of the requested ready states.
+ ///
+ /// This function is usually paired with `try_read()` or `try_write()`. It
+ /// can be used to concurrently read / write to the same socket on a single
+ /// task without splitting the socket.
+ ///
+ /// # Examples
+ ///
+ /// Concurrently read and write to the stream on the same task without
+ /// splitting.
+ ///
+ /// ```no_run
+ /// use tokio::io::Interest;
+ /// use tokio::net::TcpStream;
+ /// use std::error::Error;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// loop {
+ /// let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
+ ///
+ /// if ready.is_readable() {
+ /// // The buffer is **not** included in the async task and will only exist
+ /// // on the stack.
+ /// let mut data = [0; 1024];
+ /// let n = stream.try_read(&mut data[..]).unwrap();
+ ///
+ /// println!("GOT {:?}", &data[..n]);
+ /// }
+ ///
+ /// if ready.is_writable() {
+ /// // Write some data
+ /// stream.try_write(b"hello world").unwrap();
+ /// }
+ /// }
+ /// }
+ /// ```
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ let event = self.io.registration().readiness(interest).await?;
+ Ok(event.ready)
+ }
+
+ /// Wait for the socket to become readable.
+ ///
+ /// This function is equivalent to `ready(Interest::READABLE)` is usually
+ /// paired with `try_read()`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpStream;
+ /// use std::error::Error;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// let mut msg = vec![0; 1024];
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// stream.readable().await?;
+ ///
+ /// // Try to read data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_read(&mut msg) {
+ /// Ok(n) => {
+ /// msg.truncate(n);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// println!("GOT = {:?}", msg);
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn readable(&self) -> io::Result<()> {
+ self.ready(Interest::READABLE).await?;
+ Ok(())
+ }
+
+ /// Try to read data from the stream into the provided buffer, returning how
+ /// many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: TcpStream::readable()
+ /// [`ready()`]: TcpStream::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(n)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKinid::WouldBlock)` is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpStream;
+ /// use std::error::Error;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// stream.readable().await?;
+ ///
+ /// // Creating the buffer **after** the `await` prevents it from
+ /// // being stored in the async task.
+ /// let mut buf = [0; 4096];
+ ///
+ /// // Try to read data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_read(&mut buf) {
+ /// Ok(0) => break,
+ /// Ok(n) => {
+ /// println!("read {} bytes", n);
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
+ use std::io::Read;
+
+ self.io
+ .registration()
+ .try_io(Interest::READABLE, || (&*self.io).read(buf))
+ }
+
+ /// Wait for the socket to become writable.
+ ///
+ /// This function is equivalent to `ready(Interest::WRITABLE)` is usually
+ /// paired with `try_write()`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpStream;
+ /// use std::error::Error;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be writable
+ /// stream.writable().await?;
+ ///
+ /// // Try to write data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_write(b"hello world") {
+ /// Ok(n) => {
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn writable(&self) -> io::Result<()> {
+ self.ready(Interest::WRITABLE).await?;
+ Ok(())
+ }
+
+ /// Try to write a buffer to the stream, returning how many bytes were
+ /// written.
+ ///
+ /// The function will attempt to write the entire contents of `buf`, but
+ /// only part of the buffer may be written.
+ ///
+ /// This function is equivalent to `ready(Interest::WRITABLE)` is usually
+ /// paired with `try_write()`.
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpStream;
+ /// use std::error::Error;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be writable
+ /// stream.writable().await?;
+ ///
+ /// // Try to write data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_write(b"hello world") {
+ /// Ok(n) => {
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
+ use std::io::Write;
+
+ self.io
+ .registration()
+ .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
+ }
+
/// Receives data on the socket from the remote address to which it is
/// connected, without removing that data from the queue. On success,
/// returns the number of bytes peeked.
diff --git a/tokio/src/signal/unix/driver.rs b/tokio/src/signal/unix/driver.rs
index 323bb9df..315f3bd5 100644
--- a/tokio/src/signal/unix/driver.rs
+++ b/tokio/src/signal/unix/driver.rs
@@ -2,8 +2,8 @@
//! Signal driver
-use crate::io::driver::Driver as IoDriver;
-use crate::io::{Interest, PollEvented};
+use crate::io::driver::{Driver as IoDriver, Interest};
+use crate::io::PollEvented;
use crate::park::Park;
use crate::signal::registry::globals;
diff --git a/tokio/tests/tcp_stream.rs b/tokio/tests/tcp_stream.rs
new file mode 100644
index 00000000..784ade8a
--- /dev/null
+++ b/tokio/tests/tcp_stream.rs
@@ -0,0 +1,112 @@
+#![warn(rust_2018_idioms)]
+#![cfg(feature = "full")]
+
+use tokio::io::Interest;
+use tokio::net::{TcpListener, TcpStream};
+use tokio_test::task;
+use tokio_test::{assert_pending, assert_ready_ok};
+
+use std::io;
+
+#[tokio::test]
+async fn try_read_write() {
+ const DATA: &[u8] = b"this is some data to write to the socket";
+
+ // Create listener
+ let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+
+ // Create socket pair
+ let client = TcpStream::connect(listener.local_addr().unwrap())
+ .await
+ .unwrap();
+ let (server, _) = listener.accept().await.unwrap();
+ let mut written = DATA.to_vec();
+
+ // Track the server receiving data
+ let mut readable = task::spawn(server.readable());
+ assert_pending!(readable.poll());
+
+ // Write data.
+ client.writable().await.unwrap();
+ assert_eq!(DATA.len(), client.try_write(DATA).unwrap());
+
+ // The task should be notified
+ while !readable.is_woken() {
+ tokio::task::yield_now().await;
+ }
+
+ // Fill the write buffer
+ loop {
+ // Still ready
+ let mut writable = task::spawn(client.writable());
+ assert_ready_ok!(writable.poll());
+
+ match client.try_write(DATA) {
+ Ok(n) => written.extend(&DATA[..n]),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ break;
+ }
+ Err(e) => panic!("error = {:?}", e),
+ }
+ }
+
+ {
+ // Write buffer full
+ let mut writable = task::spawn(client.writable());
+ assert_pending!(writable.poll());
+
+ // Drain the socket from the server end
+ let mut read = vec![0; written.len()];
+ let mut i = 0;
+
+ while i < read.len() {
+ server.readable().await.unwrap();
+
+ match server.try_read(&mut read[i..]) {
+ Ok(n) => i += n,
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("error = {:?}", e),
+ }
+ }
+
+ assert_eq!(read, written);
+ }
+
+ // Now, we listen for shutdown
+ drop(client);
+
+ loop {
+ let ready = server.ready(Interest::READABLE).await.unwrap();
+
+ if ready.is_read_closed() {
+ return;
+ } else {
+ tokio::task::yield_now().await;
+ }
+ }
+}
+
+#[test]
+fn buffer_not_included_in_future() {
+ use std::mem;
+
+ const N: usize = 4096;
+
+ let fut = async {
+ let stream = TcpStream::connect("127.0.0.1:8080").await.unwrap();
+
+ loop {
+ stream.readable().await.unwrap();
+
+ let mut buf = [0; N];
+ let n = stream.try_read(&mut buf[..]).unwrap();
+
+ if n == 0 {
+ break;
+ }
+ }
+ };
+
+ let n = mem::size_of_val(&fut);
+ assert!(n < 1000);
+}