diff options
author | Carl Lerche <me@carllerche.com> | 2020-11-12 20:07:43 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-12 20:07:43 -0800 |
commit | 02b1117dca1c1e1fcc700bff4d6a93c33bfbc7d8 (patch) | |
tree | 95788ea2d89de4af6021befa70a3f5d80034578a /tokio/src/io/driver | |
parent | 685da8dadd8821d1053ce7ecaf01ab5ee231bef9 (diff) |
net: add TcpStream::ready and non-blocking ops (#3130)
Adds function to await for readiness on the TcpStream and non-blocking read/write functions.
`async fn TcpStream::ready(Interest)` waits for socket readiness satisfying **any** of the specified
interest. There are also two shorthand functions, `readable()` and `writable()`.
Once the stream is in a ready state, the caller may perform non-blocking operations on it using
`try_read()` and `try_write()`. These function return `WouldBlock` if the stream is not, in fact, ready.
The await readiness function are similar to `AsyncFd`, but do not require a guard. The guard in
`AsyncFd` protect against a potential race between receiving the readiness notification and clearing
it. The guard is needed as Tokio does not control the operations. With `TcpStream`, the `try_read()`
and `try_write()` function handle clearing stream readiness as needed.
This also exposes `Interest` and `Ready`, both defined in Tokio as wrappers for Mio types. These
types will also be useful for fixing #3072 .
Other I/O types, such as `TcpListener`, `UdpSocket`, `Unix*` should get similar functions, but this
is left for later PRs.
Refs: #3130
Diffstat (limited to 'tokio/src/io/driver')
-rw-r--r-- | tokio/src/io/driver/interest.rs | 70 | ||||
-rw-r--r-- | tokio/src/io/driver/mod.rs | 8 | ||||
-rw-r--r-- | tokio/src/io/driver/ready.rs | 120 | ||||
-rw-r--r-- | tokio/src/io/driver/registration.rs | 21 | ||||
-rw-r--r-- | tokio/src/io/driver/scheduled_io.rs | 13 |
5 files changed, 183 insertions, 49 deletions
diff --git a/tokio/src/io/driver/interest.rs b/tokio/src/io/driver/interest.rs index f9887e86..8c8049df 100644 --- a/tokio/src/io/driver/interest.rs +++ b/tokio/src/io/driver/interest.rs @@ -1,3 +1,7 @@ +#![cfg_attr(not(feature = "net"), allow(unreachable_pub))] + +use crate::io::driver::Ready; + use std::fmt; use std::ops; @@ -5,34 +9,84 @@ use std::ops; /// /// Specifies the readiness events the caller is interested in when awaiting on /// I/O resource readiness states. -#[derive(Clone, Copy)] -pub(crate) struct Interest(mio::Interest); +#[cfg_attr(docsrs, doc(cfg(feature = "net")))] +#[derive(Clone, Copy, Eq, PartialEq)] +pub struct Interest(mio::Interest); impl Interest { - /// Interest in all readable events - pub(crate) const READABLE: Interest = Interest(mio::Interest::READABLE); + /// Interest in all readable events. + /// + /// Readable interest includes read-closed events. + pub const READABLE: Interest = Interest(mio::Interest::READABLE); /// Interest in all writable events - pub(crate) const WRITABLE: Interest = Interest(mio::Interest::WRITABLE); + /// + /// Writable interest includes write-closed events. + pub const WRITABLE: Interest = Interest(mio::Interest::WRITABLE); /// Returns true if the value includes readable interest. - pub(crate) const fn is_readable(self) -> bool { + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Interest; + /// + /// assert!(Interest::READABLE.is_readable()); + /// assert!(!Interest::WRITABLE.is_readable()); + /// + /// let both = Interest::READABLE | Interest::WRITABLE; + /// assert!(both.is_readable()); + /// ``` + pub const fn is_readable(self) -> bool { self.0.is_readable() } /// Returns true if the value includes writable interest. - pub(crate) const fn is_writable(self) -> bool { + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Interest; + /// + /// assert!(!Interest::READABLE.is_writable()); + /// assert!(Interest::WRITABLE.is_writable()); + /// + /// let both = Interest::READABLE | Interest::WRITABLE; + /// assert!(both.is_writable()); + /// ``` + pub const fn is_writable(self) -> bool { self.0.is_writable() } /// Add together two `Interst` values. - pub(crate) const fn add(self, other: Interest) -> Interest { + /// + /// This function works from a `const` context. + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Interest; + /// + /// const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); + /// + /// assert!(BOTH.is_readable()); + /// assert!(BOTH.is_writable()); + pub const fn add(self, other: Interest) -> Interest { Interest(self.0.add(other.0)) } + // This function must be crate-private to avoid exposing a `mio` dependency. pub(crate) const fn to_mio(self) -> mio::Interest { self.0 } + + pub(super) fn mask(self) -> Ready { + match self { + Interest::READABLE => Ready::READABLE | Ready::READ_CLOSED, + Interest::WRITABLE => Ready::WRITABLE | Ready::WRITE_CLOSED, + _ => Ready::EMPTY, + } + } } impl ops::BitOr for Interest { diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index c494db41..a1784dff 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -1,10 +1,12 @@ #![cfg_attr(not(feature = "rt"), allow(dead_code))] mod interest; -pub(crate) use interest::Interest; +#[allow(unreachable_pub)] +pub use interest::Interest; mod ready; -use ready::Ready; +#[allow(unreachable_pub)] +pub use ready::Ready; mod registration; pub(crate) use registration::Registration; @@ -51,7 +53,7 @@ pub(crate) struct Handle { pub(crate) struct ReadyEvent { tick: u8, - ready: Ready, + pub(crate) ready: Ready, } pub(super) struct Inner { diff --git a/tokio/src/io/driver/ready.rs b/tokio/src/io/driver/ready.rs index 2790cc13..2ac01bdb 100644 --- a/tokio/src/io/driver/ready.rs +++ b/tokio/src/io/driver/ready.rs @@ -1,3 +1,5 @@ +#![cfg_attr(not(feature = "net"), allow(unreachable_pub))] + use std::fmt; use std::ops; @@ -6,36 +8,33 @@ const WRITABLE: usize = 0b0_10; const READ_CLOSED: usize = 0b0_0100; const WRITE_CLOSED: usize = 0b0_1000; -/// A set of readiness event kinds. -/// -/// `Ready` is set of operation descriptors indicating which kind of an -/// operation is ready to be performed. +/// Describes the readiness state of an I/O resources. /// -/// This struct only represents portable event kinds. Portable events are -/// events that can be raised on any platform while guaranteeing no false -/// positives. +/// `Ready` tracks which operation an I/O resource is ready to perform. +#[cfg_attr(docsrs, doc(cfg(feature = "net")))] #[derive(Clone, Copy, PartialEq, PartialOrd)] -pub(crate) struct Ready(usize); +pub struct Ready(usize); impl Ready { /// Returns the empty `Ready` set. - pub(crate) const EMPTY: Ready = Ready(0); + pub const EMPTY: Ready = Ready(0); /// Returns a `Ready` representing readable readiness. - pub(crate) const READABLE: Ready = Ready(READABLE); + pub const READABLE: Ready = Ready(READABLE); /// Returns a `Ready` representing writable readiness. - pub(crate) const WRITABLE: Ready = Ready(WRITABLE); + pub const WRITABLE: Ready = Ready(WRITABLE); /// Returns a `Ready` representing read closed readiness. - pub(crate) const READ_CLOSED: Ready = Ready(READ_CLOSED); + pub const READ_CLOSED: Ready = Ready(READ_CLOSED); /// Returns a `Ready` representing write closed readiness. - pub(crate) const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED); + pub const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED); /// Returns a `Ready` representing readiness for all operations. - pub(crate) const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED); + pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED); + // Must remain crate-private to avoid adding a public dependency on Mio. pub(crate) fn from_mio(event: &mio::event::Event) -> Ready { let mut ready = Ready::EMPTY; @@ -59,27 +58,78 @@ impl Ready { } /// Returns true if `Ready` is the empty set - pub(crate) fn is_empty(self) -> bool { + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Ready; + /// + /// assert!(Ready::EMPTY.is_empty()); + /// assert!(!Ready::READABLE.is_empty()); + /// ``` + pub fn is_empty(self) -> bool { self == Ready::EMPTY } - /// Returns true if the value includes readable readiness - pub(crate) fn is_readable(self) -> bool { + /// Returns `true` if the value includes `readable` + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Ready; + /// + /// assert!(!Ready::EMPTY.is_readable()); + /// assert!(Ready::READABLE.is_readable()); + /// assert!(Ready::READ_CLOSED.is_readable()); + /// assert!(!Ready::WRITABLE.is_readable()); + /// ``` + pub fn is_readable(self) -> bool { self.contains(Ready::READABLE) || self.is_read_closed() } - /// Returns true if the value includes writable readiness - pub(crate) fn is_writable(self) -> bool { + /// Returns `true` if the value includes writable `readiness` + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Ready; + /// + /// assert!(!Ready::EMPTY.is_writable()); + /// assert!(!Ready::READABLE.is_writable()); + /// assert!(Ready::WRITABLE.is_writable()); + /// assert!(Ready::WRITE_CLOSED.is_writable()); + /// ``` + pub fn is_writable(self) -> bool { self.contains(Ready::WRITABLE) || self.is_write_closed() } - /// Returns true if the value includes read closed readiness - pub(crate) fn is_read_closed(self) -> bool { + /// Returns `true` if the value includes read-closed `readiness` + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Ready; + /// + /// assert!(!Ready::EMPTY.is_read_closed()); + /// assert!(!Ready::READABLE.is_read_closed()); + /// assert!(Ready::READ_CLOSED.is_read_closed()); + /// ``` + pub fn is_read_closed(self) -> bool { self.contains(Ready::READ_CLOSED) } - /// Returns true if the value includes write closed readiness - pub(crate) fn is_write_closed(self) -> bool { + /// Returns `true` if the value includes write-closed `readiness` + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Ready; + /// + /// assert!(!Ready::EMPTY.is_write_closed()); + /// assert!(!Ready::WRITABLE.is_write_closed()); + /// assert!(Ready::WRITE_CLOSED.is_write_closed()); + /// ``` + pub fn is_write_closed(self) -> bool { self.contains(Ready::WRITE_CLOSED) } @@ -143,37 +193,37 @@ cfg_io_readiness! { } } -impl<T: Into<Ready>> ops::BitOr<T> for Ready { +impl ops::BitOr<Ready> for Ready { type Output = Ready; #[inline] - fn bitor(self, other: T) -> Ready { - Ready(self.0 | other.into().0) + fn bitor(self, other: Ready) -> Ready { + Ready(self.0 | other.0) } } -impl<T: Into<Ready>> ops::BitOrAssign<T> for Ready { +impl ops::BitOrAssign<Ready> for Ready { #[inline] - fn bitor_assign(&mut self, other: T) { - self.0 |= other.into().0; + fn bitor_assign(&mut self, other: Ready) { + self.0 |= other.0; } } -impl<T: Into<Ready>> ops::BitAnd<T> for Ready { +impl ops::BitAnd<Ready> for Ready { type Output = Ready; #[inline] - fn bitand(self, other: T) -> Ready { - Ready(self.0 & other.into().0) + fn bitand(self, other: Ready) -> Ready { + Ready(self.0 & other.0) } } -impl<T: Into<Ready>> ops::Sub<T> for Ready { +impl ops::Sub<Ready> for Ready { type Output = Ready; #[inline] - fn sub(self, other: T) -> Ready { - Ready(self.0 & !other.into().0) + fn sub(self, other: Ready) -> Ready { + Ready(self.0 & !other.0) } } diff --git a/tokio/src/io/driver/registration.rs b/tokio/src/io/driver/registration.rs index db9afdd7..93125814 100644 --- a/tokio/src/io/driver/registration.rs +++ b/tokio/src/io/driver/registration.rs @@ -182,6 +182,27 @@ impl Registration { } } } + + pub(crate) fn try_io<R>( + &self, + interest: Interest, + f: impl FnOnce() -> io::Result<R>, + ) -> io::Result<R> { + let ev = self.shared.ready_event(interest); + + // Don't attempt the operation if the resource is not ready. + if ev.ready.is_empty() { + return Err(io::ErrorKind::WouldBlock.into()); + } + + match f() { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.clear_readiness(ev); + Err(io::ErrorKind::WouldBlock.into()) + } + res => res, + } + } } fn gone() -> io::Error { diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index ed3adc39..75d56232 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -1,4 +1,4 @@ -use super::{Ready, ReadyEvent, Tick}; +use super::{Interest, Ready, ReadyEvent, Tick}; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Mutex; use crate::util::bit; @@ -49,8 +49,6 @@ struct Waiters { } cfg_io_readiness! { - use crate::io::Interest; - #[derive(Debug)] struct Waiter { pointers: linked_list::Pointers<Waiter>, @@ -280,6 +278,15 @@ impl ScheduledIo { } } + pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent { + let curr = self.readiness.load(Acquire); + + ReadyEvent { + tick: TICK.unpack(curr) as u8, + ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)), + } + } + /// Poll version of checking readiness for a certain direction. /// /// These are to support `AsyncRead` and `AsyncWrite` polling methods, |