summaryrefslogtreecommitdiffstats
path: root/tokio/src/io/driver
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-11-12 20:07:43 -0800
committerGitHub <noreply@github.com>2020-11-12 20:07:43 -0800
commit02b1117dca1c1e1fcc700bff4d6a93c33bfbc7d8 (patch)
tree95788ea2d89de4af6021befa70a3f5d80034578a /tokio/src/io/driver
parent685da8dadd8821d1053ce7ecaf01ab5ee231bef9 (diff)
net: add TcpStream::ready and non-blocking ops (#3130)
Adds function to await for readiness on the TcpStream and non-blocking read/write functions. `async fn TcpStream::ready(Interest)` waits for socket readiness satisfying **any** of the specified interest. There are also two shorthand functions, `readable()` and `writable()`. Once the stream is in a ready state, the caller may perform non-blocking operations on it using `try_read()` and `try_write()`. These function return `WouldBlock` if the stream is not, in fact, ready. The await readiness function are similar to `AsyncFd`, but do not require a guard. The guard in `AsyncFd` protect against a potential race between receiving the readiness notification and clearing it. The guard is needed as Tokio does not control the operations. With `TcpStream`, the `try_read()` and `try_write()` function handle clearing stream readiness as needed. This also exposes `Interest` and `Ready`, both defined in Tokio as wrappers for Mio types. These types will also be useful for fixing #3072 . Other I/O types, such as `TcpListener`, `UdpSocket`, `Unix*` should get similar functions, but this is left for later PRs. Refs: #3130
Diffstat (limited to 'tokio/src/io/driver')
-rw-r--r--tokio/src/io/driver/interest.rs70
-rw-r--r--tokio/src/io/driver/mod.rs8
-rw-r--r--tokio/src/io/driver/ready.rs120
-rw-r--r--tokio/src/io/driver/registration.rs21
-rw-r--r--tokio/src/io/driver/scheduled_io.rs13
5 files changed, 183 insertions, 49 deletions
diff --git a/tokio/src/io/driver/interest.rs b/tokio/src/io/driver/interest.rs
index f9887e86..8c8049df 100644
--- a/tokio/src/io/driver/interest.rs
+++ b/tokio/src/io/driver/interest.rs
@@ -1,3 +1,7 @@
+#![cfg_attr(not(feature = "net"), allow(unreachable_pub))]
+
+use crate::io::driver::Ready;
+
use std::fmt;
use std::ops;
@@ -5,34 +9,84 @@ use std::ops;
///
/// Specifies the readiness events the caller is interested in when awaiting on
/// I/O resource readiness states.
-#[derive(Clone, Copy)]
-pub(crate) struct Interest(mio::Interest);
+#[cfg_attr(docsrs, doc(cfg(feature = "net")))]
+#[derive(Clone, Copy, Eq, PartialEq)]
+pub struct Interest(mio::Interest);
impl Interest {
- /// Interest in all readable events
- pub(crate) const READABLE: Interest = Interest(mio::Interest::READABLE);
+ /// Interest in all readable events.
+ ///
+ /// Readable interest includes read-closed events.
+ pub const READABLE: Interest = Interest(mio::Interest::READABLE);
/// Interest in all writable events
- pub(crate) const WRITABLE: Interest = Interest(mio::Interest::WRITABLE);
+ ///
+ /// Writable interest includes write-closed events.
+ pub const WRITABLE: Interest = Interest(mio::Interest::WRITABLE);
/// Returns true if the value includes readable interest.
- pub(crate) const fn is_readable(self) -> bool {
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::io::Interest;
+ ///
+ /// assert!(Interest::READABLE.is_readable());
+ /// assert!(!Interest::WRITABLE.is_readable());
+ ///
+ /// let both = Interest::READABLE | Interest::WRITABLE;
+ /// assert!(both.is_readable());
+ /// ```
+ pub const fn is_readable(self) -> bool {
self.0.is_readable()
}
/// Returns true if the value includes writable interest.
- pub(crate) const fn is_writable(self) -> bool {
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::io::Interest;
+ ///
+ /// assert!(!Interest::READABLE.is_writable());
+ /// assert!(Interest::WRITABLE.is_writable());
+ ///
+ /// let both = Interest::READABLE | Interest::WRITABLE;
+ /// assert!(both.is_writable());
+ /// ```
+ pub const fn is_writable(self) -> bool {
self.0.is_writable()
}
/// Add together two `Interst` values.
- pub(crate) const fn add(self, other: Interest) -> Interest {
+ ///
+ /// This function works from a `const` context.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::io::Interest;
+ ///
+ /// const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
+ ///
+ /// assert!(BOTH.is_readable());
+ /// assert!(BOTH.is_writable());
+ pub const fn add(self, other: Interest) -> Interest {
Interest(self.0.add(other.0))
}
+ // This function must be crate-private to avoid exposing a `mio` dependency.
pub(crate) const fn to_mio(self) -> mio::Interest {
self.0
}
+
+ pub(super) fn mask(self) -> Ready {
+ match self {
+ Interest::READABLE => Ready::READABLE | Ready::READ_CLOSED,
+ Interest::WRITABLE => Ready::WRITABLE | Ready::WRITE_CLOSED,
+ _ => Ready::EMPTY,
+ }
+ }
}
impl ops::BitOr for Interest {
diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs
index c494db41..a1784dff 100644
--- a/tokio/src/io/driver/mod.rs
+++ b/tokio/src/io/driver/mod.rs
@@ -1,10 +1,12 @@
#![cfg_attr(not(feature = "rt"), allow(dead_code))]
mod interest;
-pub(crate) use interest::Interest;
+#[allow(unreachable_pub)]
+pub use interest::Interest;
mod ready;
-use ready::Ready;
+#[allow(unreachable_pub)]
+pub use ready::Ready;
mod registration;
pub(crate) use registration::Registration;
@@ -51,7 +53,7 @@ pub(crate) struct Handle {
pub(crate) struct ReadyEvent {
tick: u8,
- ready: Ready,
+ pub(crate) ready: Ready,
}
pub(super) struct Inner {
diff --git a/tokio/src/io/driver/ready.rs b/tokio/src/io/driver/ready.rs
index 2790cc13..2ac01bdb 100644
--- a/tokio/src/io/driver/ready.rs
+++ b/tokio/src/io/driver/ready.rs
@@ -1,3 +1,5 @@
+#![cfg_attr(not(feature = "net"), allow(unreachable_pub))]
+
use std::fmt;
use std::ops;
@@ -6,36 +8,33 @@ const WRITABLE: usize = 0b0_10;
const READ_CLOSED: usize = 0b0_0100;
const WRITE_CLOSED: usize = 0b0_1000;
-/// A set of readiness event kinds.
-///
-/// `Ready` is set of operation descriptors indicating which kind of an
-/// operation is ready to be performed.
+/// Describes the readiness state of an I/O resources.
///
-/// This struct only represents portable event kinds. Portable events are
-/// events that can be raised on any platform while guaranteeing no false
-/// positives.
+/// `Ready` tracks which operation an I/O resource is ready to perform.
+#[cfg_attr(docsrs, doc(cfg(feature = "net")))]
#[derive(Clone, Copy, PartialEq, PartialOrd)]
-pub(crate) struct Ready(usize);
+pub struct Ready(usize);
impl Ready {
/// Returns the empty `Ready` set.
- pub(crate) const EMPTY: Ready = Ready(0);
+ pub const EMPTY: Ready = Ready(0);
/// Returns a `Ready` representing readable readiness.
- pub(crate) const READABLE: Ready = Ready(READABLE);
+ pub const READABLE: Ready = Ready(READABLE);
/// Returns a `Ready` representing writable readiness.
- pub(crate) const WRITABLE: Ready = Ready(WRITABLE);
+ pub const WRITABLE: Ready = Ready(WRITABLE);
/// Returns a `Ready` representing read closed readiness.
- pub(crate) const READ_CLOSED: Ready = Ready(READ_CLOSED);
+ pub const READ_CLOSED: Ready = Ready(READ_CLOSED);
/// Returns a `Ready` representing write closed readiness.
- pub(crate) const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED);
+ pub const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED);
/// Returns a `Ready` representing readiness for all operations.
- pub(crate) const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED);
+ pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED);
+ // Must remain crate-private to avoid adding a public dependency on Mio.
pub(crate) fn from_mio(event: &mio::event::Event) -> Ready {
let mut ready = Ready::EMPTY;
@@ -59,27 +58,78 @@ impl Ready {
}
/// Returns true if `Ready` is the empty set
- pub(crate) fn is_empty(self) -> bool {
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::io::Ready;
+ ///
+ /// assert!(Ready::EMPTY.is_empty());
+ /// assert!(!Ready::READABLE.is_empty());
+ /// ```
+ pub fn is_empty(self) -> bool {
self == Ready::EMPTY
}
- /// Returns true if the value includes readable readiness
- pub(crate) fn is_readable(self) -> bool {
+ /// Returns `true` if the value includes `readable`
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::io::Ready;
+ ///
+ /// assert!(!Ready::EMPTY.is_readable());
+ /// assert!(Ready::READABLE.is_readable());
+ /// assert!(Ready::READ_CLOSED.is_readable());
+ /// assert!(!Ready::WRITABLE.is_readable());
+ /// ```
+ pub fn is_readable(self) -> bool {
self.contains(Ready::READABLE) || self.is_read_closed()
}
- /// Returns true if the value includes writable readiness
- pub(crate) fn is_writable(self) -> bool {
+ /// Returns `true` if the value includes writable `readiness`
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::io::Ready;
+ ///
+ /// assert!(!Ready::EMPTY.is_writable());
+ /// assert!(!Ready::READABLE.is_writable());
+ /// assert!(Ready::WRITABLE.is_writable());
+ /// assert!(Ready::WRITE_CLOSED.is_writable());
+ /// ```
+ pub fn is_writable(self) -> bool {
self.contains(Ready::WRITABLE) || self.is_write_closed()
}
- /// Returns true if the value includes read closed readiness
- pub(crate) fn is_read_closed(self) -> bool {
+ /// Returns `true` if the value includes read-closed `readiness`
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::io::Ready;
+ ///
+ /// assert!(!Ready::EMPTY.is_read_closed());
+ /// assert!(!Ready::READABLE.is_read_closed());
+ /// assert!(Ready::READ_CLOSED.is_read_closed());
+ /// ```
+ pub fn is_read_closed(self) -> bool {
self.contains(Ready::READ_CLOSED)
}
- /// Returns true if the value includes write closed readiness
- pub(crate) fn is_write_closed(self) -> bool {
+ /// Returns `true` if the value includes write-closed `readiness`
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::io::Ready;
+ ///
+ /// assert!(!Ready::EMPTY.is_write_closed());
+ /// assert!(!Ready::WRITABLE.is_write_closed());
+ /// assert!(Ready::WRITE_CLOSED.is_write_closed());
+ /// ```
+ pub fn is_write_closed(self) -> bool {
self.contains(Ready::WRITE_CLOSED)
}
@@ -143,37 +193,37 @@ cfg_io_readiness! {
}
}
-impl<T: Into<Ready>> ops::BitOr<T> for Ready {
+impl ops::BitOr<Ready> for Ready {
type Output = Ready;
#[inline]
- fn bitor(self, other: T) -> Ready {
- Ready(self.0 | other.into().0)
+ fn bitor(self, other: Ready) -> Ready {
+ Ready(self.0 | other.0)
}
}
-impl<T: Into<Ready>> ops::BitOrAssign<T> for Ready {
+impl ops::BitOrAssign<Ready> for Ready {
#[inline]
- fn bitor_assign(&mut self, other: T) {
- self.0 |= other.into().0;
+ fn bitor_assign(&mut self, other: Ready) {
+ self.0 |= other.0;
}
}
-impl<T: Into<Ready>> ops::BitAnd<T> for Ready {
+impl ops::BitAnd<Ready> for Ready {
type Output = Ready;
#[inline]
- fn bitand(self, other: T) -> Ready {
- Ready(self.0 & other.into().0)
+ fn bitand(self, other: Ready) -> Ready {
+ Ready(self.0 & other.0)
}
}
-impl<T: Into<Ready>> ops::Sub<T> for Ready {
+impl ops::Sub<Ready> for Ready {
type Output = Ready;
#[inline]
- fn sub(self, other: T) -> Ready {
- Ready(self.0 & !other.into().0)
+ fn sub(self, other: Ready) -> Ready {
+ Ready(self.0 & !other.0)
}
}
diff --git a/tokio/src/io/driver/registration.rs b/tokio/src/io/driver/registration.rs
index db9afdd7..93125814 100644
--- a/tokio/src/io/driver/registration.rs
+++ b/tokio/src/io/driver/registration.rs
@@ -182,6 +182,27 @@ impl Registration {
}
}
}
+
+ pub(crate) fn try_io<R>(
+ &self,
+ interest: Interest,
+ f: impl FnOnce() -> io::Result<R>,
+ ) -> io::Result<R> {
+ let ev = self.shared.ready_event(interest);
+
+ // Don't attempt the operation if the resource is not ready.
+ if ev.ready.is_empty() {
+ return Err(io::ErrorKind::WouldBlock.into());
+ }
+
+ match f() {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.clear_readiness(ev);
+ Err(io::ErrorKind::WouldBlock.into())
+ }
+ res => res,
+ }
+ }
}
fn gone() -> io::Error {
diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs
index ed3adc39..75d56232 100644
--- a/tokio/src/io/driver/scheduled_io.rs
+++ b/tokio/src/io/driver/scheduled_io.rs
@@ -1,4 +1,4 @@
-use super::{Ready, ReadyEvent, Tick};
+use super::{Interest, Ready, ReadyEvent, Tick};
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::util::bit;
@@ -49,8 +49,6 @@ struct Waiters {
}
cfg_io_readiness! {
- use crate::io::Interest;
-
#[derive(Debug)]
struct Waiter {
pointers: linked_list::Pointers<Waiter>,
@@ -280,6 +278,15 @@ impl ScheduledIo {
}
}
+ pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent {
+ let curr = self.readiness.load(Acquire);
+
+ ReadyEvent {
+ tick: TICK.unpack(curr) as u8,
+ ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)),
+ }
+ }
+
/// Poll version of checking readiness for a certain direction.
///
/// These are to support `AsyncRead` and `AsyncWrite` polling methods,