summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-11-11 09:28:21 -0800
committerGitHub <noreply@github.com>2020-11-11 09:28:21 -0800
commitce891a4df17e632f7557dd0cd1f1e8da89bd6ae4 (patch)
treefa5478c0b3bacacfc65bfbadbe1cdb92234d5b5f /tokio
parentd869e16990c5fc2cbda48b036708efa4b450e807 (diff)
io: driver internal cleanup (#3124)
* Removes duplicated code by moving it to `Registration`. * impl `Deref` for `PollEvented` to avoid `get_ref()`. * Avoid extra waker clones in I/O driver. * Add `Interest` wrapper around `mio::Interest`.
Diffstat (limited to 'tokio')
-rw-r--r--tokio/src/io/async_fd.rs15
-rw-r--r--tokio/src/io/driver/interest.rs58
-rw-r--r--tokio/src/io/driver/mod.rs14
-rw-r--r--tokio/src/io/driver/ready.rs8
-rw-r--r--tokio/src/io/driver/registration.rs (renamed from tokio/src/io/registration.rs)89
-rw-r--r--tokio/src/io/driver/scheduled_io.rs24
-rw-r--r--tokio/src/io/mod.rs3
-rw-r--r--tokio/src/io/poll_evented.rs228
-rw-r--r--tokio/src/macros/cfg.rs13
-rw-r--r--tokio/src/net/tcp/listener.rs23
-rw-r--r--tokio/src/net/tcp/stream.rs73
-rw-r--r--tokio/src/net/udp/socket.rs171
-rw-r--r--tokio/src/net/unix/datagram/socket.rs38
-rw-r--r--tokio/src/net/unix/listener.rs32
-rw-r--r--tokio/src/net/unix/stream.rs52
-rw-r--r--tokio/src/process/mod.rs34
-rw-r--r--tokio/src/process/unix/mod.rs58
-rw-r--r--tokio/src/signal/unix/driver.rs10
18 files changed, 438 insertions, 505 deletions
diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs
index 617f42e0..99f23fd6 100644
--- a/tokio/src/io/async_fd.rs
+++ b/tokio/src/io/async_fd.rs
@@ -1,5 +1,4 @@
-use crate::io::driver::{Direction, Handle, ReadyEvent};
-use crate::io::registration::Registration;
+use crate::io::driver::{Handle, Interest, ReadyEvent, Registration};
use mio::unix::SourceFd;
use std::io;
@@ -74,7 +73,7 @@ pub struct AsyncFdReadyGuard<'a, T: AsRawFd> {
event: Option<ReadyEvent>,
}
-const ALL_INTEREST: mio::Interest = mio::Interest::READABLE.add(mio::Interest::WRITABLE);
+const ALL_INTEREST: Interest = Interest::READABLE.add(Interest::WRITABLE);
impl<T: AsRawFd> AsyncFd<T> {
/// Creates an AsyncFd backed by (and taking ownership of) an object
@@ -145,7 +144,7 @@ impl<T: AsRawFd> AsyncFd<T> {
&'a self,
cx: &mut Context<'_>,
) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>> {
- let event = ready!(self.registration.poll_readiness(cx, Direction::Read))?;
+ let event = ready!(self.registration.poll_read_ready(cx))?;
Ok(AsyncFdReadyGuard {
async_fd: self,
@@ -170,7 +169,7 @@ impl<T: AsRawFd> AsyncFd<T> {
&'a self,
cx: &mut Context<'_>,
) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>> {
- let event = ready!(self.registration.poll_readiness(cx, Direction::Write))?;
+ let event = ready!(self.registration.poll_write_ready(cx))?;
Ok(AsyncFdReadyGuard {
async_fd: self,
@@ -179,7 +178,7 @@ impl<T: AsRawFd> AsyncFd<T> {
.into()
}
- async fn readiness(&self, interest: mio::Interest) -> io::Result<AsyncFdReadyGuard<'_, T>> {
+ async fn readiness(&self, interest: Interest) -> io::Result<AsyncFdReadyGuard<'_, T>> {
let event = self.registration.readiness(interest).await?;
Ok(AsyncFdReadyGuard {
@@ -193,7 +192,7 @@ impl<T: AsRawFd> AsyncFd<T> {
///
/// [`AsyncFdReadyGuard`]: struct@self::AsyncFdReadyGuard
pub async fn readable(&self) -> io::Result<AsyncFdReadyGuard<'_, T>> {
- self.readiness(mio::Interest::READABLE).await
+ self.readiness(Interest::READABLE).await
}
/// Waits for the file descriptor to become writable, returning a
@@ -201,7 +200,7 @@ impl<T: AsRawFd> AsyncFd<T> {
///
/// [`AsyncFdReadyGuard`]: struct@self::AsyncFdReadyGuard
pub async fn writable(&self) -> io::Result<AsyncFdReadyGuard<'_, T>> {
- self.readiness(mio::Interest::WRITABLE).await
+ self.readiness(Interest::WRITABLE).await
}
}
diff --git a/tokio/src/io/driver/interest.rs b/tokio/src/io/driver/interest.rs
new file mode 100644
index 00000000..f9887e86
--- /dev/null
+++ b/tokio/src/io/driver/interest.rs
@@ -0,0 +1,58 @@
+use std::fmt;
+use std::ops;
+
+/// Readiness event interest
+///
+/// Specifies the readiness events the caller is interested in when awaiting on
+/// I/O resource readiness states.
+#[derive(Clone, Copy)]
+pub(crate) struct Interest(mio::Interest);
+
+impl Interest {
+ /// Interest in all readable events
+ pub(crate) const READABLE: Interest = Interest(mio::Interest::READABLE);
+
+ /// Interest in all writable events
+ pub(crate) const WRITABLE: Interest = Interest(mio::Interest::WRITABLE);
+
+ /// Returns true if the value includes readable interest.
+ pub(crate) const fn is_readable(self) -> bool {
+ self.0.is_readable()
+ }
+
+ /// Returns true if the value includes writable interest.
+ pub(crate) const fn is_writable(self) -> bool {
+ self.0.is_writable()
+ }
+
+ /// Add together two `Interst` values.
+ pub(crate) const fn add(self, other: Interest) -> Interest {
+ Interest(self.0.add(other.0))
+ }
+
+ pub(crate) const fn to_mio(self) -> mio::Interest {
+ self.0
+ }
+}
+
+impl ops::BitOr for Interest {
+ type Output = Self;
+
+ #[inline]
+ fn bitor(self, other: Self) -> Self {
+ self.add(other)
+ }
+}
+
+impl ops::BitOrAssign for Interest {
+ #[inline]
+ fn bitor_assign(&mut self, other: Self) {
+ self.0 = (*self | other).0;
+ }
+}
+
+impl fmt::Debug for Interest {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.0.fmt(fmt)
+ }
+}
diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs
index a7d4b7b8..c494db41 100644
--- a/tokio/src/io/driver/mod.rs
+++ b/tokio/src/io/driver/mod.rs
@@ -1,10 +1,16 @@
#![cfg_attr(not(feature = "rt"), allow(dead_code))]
+mod interest;
+pub(crate) use interest::Interest;
+
mod ready;
use ready::Ready;
+mod registration;
+pub(crate) use registration::Registration;
+
mod scheduled_io;
-pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests
+use scheduled_io::ScheduledIo;
use crate::park::{Park, Unpark};
use crate::util::slab::{self, Slab};
@@ -68,7 +74,7 @@ pub(super) struct Inner {
}
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
-pub(super) enum Direction {
+enum Direction {
Read,
Write,
}
@@ -313,7 +319,7 @@ impl Inner {
pub(super) fn add_source(
&self,
source: &mut impl mio::event::Source,
- interest: mio::Interest,
+ interest: Interest,
) -> io::Result<slab::Ref<ScheduledIo>> {
let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| {
io::Error::new(
@@ -325,7 +331,7 @@ impl Inner {
let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0));
self.registry
- .register(source, mio::Token(token), interest)?;
+ .register(source, mio::Token(token), interest.to_mio())?;
Ok(shared)
}
diff --git a/tokio/src/io/driver/ready.rs b/tokio/src/io/driver/ready.rs
index 8b556e94..2790cc13 100644
--- a/tokio/src/io/driver/ready.rs
+++ b/tokio/src/io/driver/ready.rs
@@ -114,8 +114,10 @@ impl Ready {
}
cfg_io_readiness! {
+ use crate::io::Interest;
+
impl Ready {
- pub(crate) fn from_interest(interest: mio::Interest) -> Ready {
+ pub(crate) fn from_interest(interest: Interest) -> Ready {
let mut ready = Ready::EMPTY;
if interest.is_readable() {
@@ -131,11 +133,11 @@ cfg_io_readiness! {
ready
}
- pub(crate) fn intersection(self, interest: mio::Interest) -> Ready {
+ pub(crate) fn intersection(self, interest: Interest) -> Ready {
Ready(self.0 & Ready::from_interest(interest).0)
}
- pub(crate) fn satisfies(self, interest: mio::Interest) -> bool {
+ pub(crate) fn satisfies(self, interest: Interest) -> bool {
self.0 & Ready::from_interest(interest).0 != 0
}
}
diff --git a/tokio/src/io/registration.rs b/tokio/src/io/driver/registration.rs
index 0b166490..db9afdd7 100644
--- a/tokio/src/io/registration.rs
+++ b/tokio/src/io/driver/registration.rs
@@ -1,4 +1,4 @@
-use crate::io::driver::{Direction, Handle, ReadyEvent, ScheduledIo};
+use crate::io::driver::{Direction, Handle, Interest, ReadyEvent, ScheduledIo};
use crate::util::slab;
use mio::event::Source;
@@ -53,11 +53,12 @@ unsafe impl Sync for Registration {}
// ===== impl Registration =====
impl Registration {
- /// Registers the I/O resource with the default reactor, for a specific `mio::Interest`.
- /// `new_with_interest` should be used over `new` when you need control over the readiness state,
- /// such as when a file descriptor only allows reads. This does not add `hup` or `error` so if
- /// you are interested in those states, you will need to add them to the readiness state passed
- /// to this function.
+ /// Registers the I/O resource with the default reactor, for a specific
+ /// `Interest`. `new_with_interest` should be used over `new` when you need
+ /// control over the readiness state, such as when a file descriptor only
+ /// allows reads. This does not add `hup` or `error` so if you are
+ /// interested in those states, you will need to add them to the readiness
+ /// state passed to this function.
///
/// # Return
///
@@ -65,7 +66,7 @@ impl Registration {
/// - `Err` if an error was encountered during registration
pub(crate) fn new_with_interest_and_handle(
io: &mut impl Source,
- interest: mio::Interest,
+ interest: Interest,
handle: Handle,
) -> io::Result<Registration> {
let shared = if let Some(inner) = handle.inner() {
@@ -96,7 +97,7 @@ impl Registration {
/// no longer result in notifications getting sent for this registration.
///
/// `Err` is returned if an error is encountered.
- pub(super) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> {
+ pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> {
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
@@ -104,15 +105,47 @@ impl Registration {
inner.deregister_source(io)
}
- pub(super) fn clear_readiness(&self, event: ReadyEvent) {
+ pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
self.shared.clear_readiness(event);
}
+ // Uses the poll path, requiring the caller to ensure mutual exclusion for
+ // correctness. Only the last task to call this function is notified.
+ pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
+ self.poll_ready(cx, Direction::Read)
+ }
+
+ // Uses the poll path, requiring the caller to ensure mutual exclusion for
+ // correctness. Only the last task to call this function is notified.
+ pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
+ self.poll_ready(cx, Direction::Write)
+ }
+
+ // Uses the poll path, requiring the caller to ensure mutual exclusion for
+ // correctness. Only the last task to call this function is notified.
+ pub(crate) fn poll_read_io<R>(
+ &self,
+ cx: &mut Context<'_>,
+ f: impl FnMut() -> io::Result<R>,
+ ) -> Poll<io::Result<R>> {
+ self.poll_io(cx, Direction::Read, f)
+ }
+
+ // Uses the poll path, requiring the caller to ensure mutual exclusion for
+ // correctness. Only the last task to call this function is notified.
+ pub(crate) fn poll_write_io<R>(
+ &self,
+ cx: &mut Context<'_>,
+ f: impl FnMut() -> io::Result<R>,
+ ) -> Poll<io::Result<R>> {
+ self.poll_io(cx, Direction::Write, f)
+ }
+
/// Polls for events on the I/O resource's `direction` readiness stream.
///
/// If called with a task context, notify the task when a new event is
/// received.
- pub(super) fn poll_readiness(
+ fn poll_ready(
&self,
cx: &mut Context<'_>,
direction: Direction,
@@ -128,6 +161,27 @@ impl Registration {
coop.made_progress();
Poll::Ready(Ok(ev))
}
+
+ fn poll_io<R>(
+ &self,
+ cx: &mut Context<'_>,
+ direction: Direction,
+ mut f: impl FnMut() -> io::Result<R>,
+ ) -> Poll<io::Result<R>> {
+ loop {
+ let ev = ready!(self.poll_ready(cx, direction))?;
+
+ match f() {
+ Ok(ret) => {
+ return Poll::Ready(Ok(ret));
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.clear_readiness(ev);
+ }
+ Err(e) => return Poll::Ready(Err(e)),
+ }
+ }
+ }
}
fn gone() -> io::Error {
@@ -136,7 +190,7 @@ fn gone() -> io::Error {
cfg_io_readiness! {
impl Registration {
- pub(super) async fn readiness(&self, interest: mio::Interest) -> io::Result<ReadyEvent> {
+ pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> {
use std::future::Future;
use std::pin::Pin;
@@ -151,5 +205,18 @@ cfg_io_readiness! {
Pin::new(&mut fut).poll(cx).map(Ok)
}).await
}
+
+ pub(crate) async fn async_io<R>(&self, interest: Interest, mut f: impl FnMut() -> io::Result<R>) -> io::Result<R> {
+ loop {
+ let event = self.readiness(interest).await?;
+
+ match f() {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.clear_readiness(event);
+ }
+ x => return x,
+ }
+ }
+ }
}
}
diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs
index 3aefb376..ed3adc39 100644
--- a/tokio/src/io/driver/scheduled_io.rs
+++ b/tokio/src/io/driver/scheduled_io.rs
@@ -49,6 +49,8 @@ struct Waiters {
}
cfg_io_readiness! {
+ use crate::io::Interest;
+
#[derive(Debug)]
struct Waiter {
pointers: linked_list::Pointers<Waiter>,
@@ -57,7 +59,7 @@ cfg_io_readiness! {
waker: Option<Waker>,
/// The interest this waiter is waiting on
- interest: mio::Interest,
+ interest: Interest,
is_ready: bool,
@@ -283,7 +285,7 @@ impl ScheduledIo {
/// These are to support `AsyncRead` and `AsyncWrite` polling methods,
/// which cannot use the `async fn` version. This uses reserved reader
/// and writer slots.
- pub(in crate::io) fn poll_readiness(
+ pub(super) fn poll_readiness(
&self,
cx: &mut Context<'_>,
direction: Direction,
@@ -299,7 +301,19 @@ impl ScheduledIo {
Direction::Read => &mut waiters.reader,
Direction::Write => &mut waiters.writer,
};
- *slot = Some(cx.waker().clone());
+
+ // Avoid cloning the waker if one is already stored that matches the
+ // current task.
+ match slot {
+ Some(existing) => {
+ if !existing.will_wake(cx.waker()) {
+ *existing = cx.waker().clone();
+ }
+ }
+ None => {
+ *slot = Some(cx.waker().clone());
+ }
+ }
// Try again, in case the readiness was changed while we were
// taking the waiters lock
@@ -348,7 +362,7 @@ unsafe impl Sync for ScheduledIo {}
cfg_io_readiness! {
impl ScheduledIo {
/// An async version of `poll_readiness` which uses a linked list of wakers
- pub(crate) async fn readiness(&self, interest: mio::Interest) -> ReadyEvent {
+ pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent {
self.readiness_fut(interest).await
}
@@ -356,7 +370,7 @@ cfg_io_readiness! {
// we are borrowing the `UnsafeCell` possibly over await boundaries.
//
// Go figure.
- fn readiness_fut(&self, interest: mio::Interest) -> Readiness<'_> {
+ fn readiness_fut(&self, interest: Interest) -> Readiness<'_> {
Readiness {
scheduled_io: self,
state: State::Init,
diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs
index 20d92233..499633ee 100644
--- a/tokio/src/io/mod.rs
+++ b/tokio/src/io/mod.rs
@@ -206,8 +206,7 @@ pub use std::io::{Error, ErrorKind, Result, SeekFrom};
cfg_io_driver! {
pub(crate) mod driver;
-
- mod registration;
+ pub(crate) use driver::Interest;
mod poll_evented;
diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs
index 66a26346..803932ba 100644
--- a/tokio/src/io/poll_evented.rs
+++ b/tokio/src/io/poll_evented.rs
@@ -1,13 +1,9 @@
-use crate::io::driver::{Direction, Handle, ReadyEvent};
-use crate::io::registration::Registration;
-use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
+use crate::io::driver::{Handle, Interest, Registration};
use mio::event::Source;
use std::fmt;
-use std::io::{self, Read, Write};
-use std::marker::Unpin;
-use std::pin::Pin;
-use std::task::{Context, Poll};
+use std::io;
+use std::ops::Deref;
cfg_io_driver! {
/// Associates an I/O resource that implements the [`std::io::Read`] and/or
@@ -89,30 +85,32 @@ impl<E: Source> PollEvented<E> {
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
#[cfg_attr(feature = "signal", allow(unused))]
pub(crate) fn new(io: E) -> io::Result<Self> {
- PollEvented::new_with_interest(io, mio::Interest::READABLE | mio::Interest::WRITABLE)
+ PollEvented::new_with_interest(io, Interest::READABLE | Interest::WRITABLE)
}
- /// Creates a new `PollEvented` associated with the default reactor, for specific `mio::Interest`
- /// state. `new_with_interest` should be used over `new` when you need control over the readiness
- /// state, such as when a file descriptor only allows reads. This does not add `hup` or `error`
- /// so if you are interested in those states, you will need to add them to the readiness state
- /// passed to this function.
+ /// Creates a new `PollEvented` associated with the default reactor, for
+ /// specific `Interest` state. `new_with_interest` should be used over `new`
+ /// when you need control over the readiness state, such as when a file
+ /// descriptor only allows reads. This does not add `hup` or `error` so if
+ /// you are interested in those states, you will need to add them to the
+ /// readiness state passed to this function.
///
/// # Panics
///
/// This function panics if thread-local runtime is not set.
///
- /// The runtime is usually set implicitly when this function is called
- /// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ /// The runtime is usually set implicitly when this function is called from
+ /// a future driven by a tokio runtime, otherwise runtime can be set
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter)
+ /// function.
#[cfg_attr(feature = "signal", allow(unused))]
- pub(crate) fn new_with_interest(io: E, interest: mio::Interest) -> io::Result<Self> {
+ pub(crate) fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
Self::new_with_interest_and_handle(io, interest, Handle::current())
}
pub(crate) fn new_with_interest_and_handle(
mut io: E,
- interest: mio::Interest,
+ interest: Interest,
handle: Handle,
) -> io::Result<Self> {
let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
@@ -122,177 +120,57 @@ impl<E: Source> PollEvented<E> {
})
}
- /// Returns a shared reference to the underlying I/O object this readiness
- /// stream is wrapping.
- #[cfg(any(feature = "net", feature = "process", feature = "signal"))]
- pub(crate) fn get_ref(&self) -> &E {
- self.io.as_ref().unwrap()
- }
-
- /// Returns a mutable reference to the underlying I/O object this readiness
- /// stream is wrapping.
- pub(crate) fn get_mut(&mut self) -> &mut E {
- self.io.as_mut().unwrap()
- }
-
- pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
- self.registration.clear_readiness(event);
+ /// Returns a reference to the registration
+ pub(crate) fn registration(&self) -> &Registration {
+ &self.registration
}
+}
- /// Checks the I/O resource's read readiness state.
- ///
- /// The mask argument allows specifying what readiness to notify on. This
- /// can be any value, including platform specific readiness, **except**
- /// `writable`. HUP is always implicitly included on platforms that support
- /// it.
- ///
- /// If the resource is not ready for a read then `Poll::Pending` is returned
- /// and the current task is notified once a new event is received.
- ///
- /// The I/O resource will remain in a read-ready state until readiness is
- /// cleared by calling [`clear_read_ready`].
- ///
- /// [`clear_read_ready`]: method@Self::clear_read_ready
- ///
- /// # Panics
- ///
- /// This function panics if:
- ///
- /// * `ready` includes writable.
- /// * called from outside of a task context.
- ///
- /// # Warning
- ///
- /// This method may not be called concurrently. It takes `&self` to allow
- /// calling it concurrently with `poll_write_ready`.
- pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
- self.registration.poll_readiness(cx, Direction::Read)
- }
+feature! {
+ #![any(feature = "net", feature = "process")]
- /// Checks the I/O resource's write readiness state.
- ///
- /// This always checks for writable readiness and also checks for HUP
- /// readiness on platforms that support it.
- ///
- /// If the resource is not ready for a write then `Poll::Pending` is
- /// returned and the current task is notified once a new event is received.
- ///
- /// The I/O resource will remain in a write-ready state until readiness is
- /// cleared by calling [`clear_write_ready`].
- ///
- /// [`clear_write_ready`]: method@Self::clear_write_ready
- ///
- /// # Panics
- ///
- /// This function panics if:
- ///
- /// * `ready` contains bits besides `writable` and `hup`.
- /// * called from outside of a task context.
- ///
- /// # Warning
- ///
- /// This method may not be called concurrently. It takes `&self` to allow
- /// calling it concurrently with `poll_read_ready`.
- pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
- self.registration.poll_readiness(cx, Direction::Write)
- }
-}
+ use crate::io::ReadBuf;
+ use std::task::{Context, Poll};
-cfg_io_readiness! {
impl<E: Source> PollEvented<E> {
- pub(crate) async fn readiness(&self, interest: mio::Interest) -> io::Result<ReadyEvent> {
- self.registration.readiness(interest).await
- }
-
- pub(crate) async fn async_io<F, R>(&self, interest: mio::Interest, mut op: F) -> io::Result<R>
+ // Safety: The caller must ensure that `E` can read into uninitialized memory
+ pub(crate) unsafe fn poll_read<'a>(
+ &'a self,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>>
where
- F: FnMut(&E) -> io::Result<R>,
+ &'a E: io::Read + 'a,
{
- loop {
- let event = self.readiness(interest).await?;
-
- match op(self.get_ref()) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.clear_readiness(event);
- }
- x => return x,
- }
- }
+ use std::io::Read;
+
+ let n = ready!(self.registration.poll_read_io(cx, || {
+ let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
+ self.io.as_ref().unwrap().read(b)
+ }))?;
+
+ // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
+ // buffer.
+ buf.assume_init(n);
+ buf.advance(n);
+ Poll::Ready(Ok(()))
}
- }
-}
-
-// ===== Read / Write impls =====
-
-impl<E: Source + Read + Unpin> AsyncRead for PollEvented<E> {
- fn poll_read(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &mut ReadBuf<'_>,
- ) -> Poll<io::Result<()>> {
- loop {
- let ev = ready!(self.poll_read_ready(cx))?;
-
- // We can't assume the `Read` won't look at the read buffer,
- // so we have to force initialization here.
- let r = (*self).get_mut().read(buf.initialize_unfilled());
- if is_wouldblock(&r) {
- self.clear_readiness(ev);
- continue;
- }
-
- return Poll::Ready(r.map(|n| {
- buf.advance(n);
- }));
+ pub(crate) fn poll_write<'a>(&'a self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>>
+ where
+ &'a E: io::Write + 'a,
+ {
+ use std::io::Write;
+ self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write(buf))
}
}
}
-impl<E: Source + Write + Unpin> AsyncWrite for PollEvented<E> {
- fn poll_write(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<io::Result<usize>> {
- loop {
- let ev = ready!(self.poll_write_ready(cx))?;
+impl<E: Source> Deref for PollEvented<E> {
+ type Target = E;
- let r = (*self).get_mut().write(buf);
-
- if is_wouldblock(&r) {
- self.clear_readiness(ev);
- continue;
- }
-
- return Poll::Ready(r);
- }
- }
-
- fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- loop {
- let ev = ready!(self.poll_write_ready(cx))?;
-
- let r = (*self).get_mut().flush();
-
- if is_wouldblock(&r) {
- self.clear_readiness(ev);
- continue;
- }
-
- return Poll::Ready(r);
- }
- }
-
- fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- Poll::Ready(Ok(()))
- }
-}
-
-fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
- match *r {
- Ok(_) => false,
- Err(ref e) => e.kind() == io::ErrorKind::WouldBlock,
+ fn deref(&self) -> &E {
+ self.io.as_ref().unwrap()
}
}
diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs
index 27929119..edf681a4 100644
--- a/tokio/src/macros/cfg.rs
+++ b/tokio/src/macros/cfg.rs
@@ -1,5 +1,18 @@
#![allow(unused_macros)]
+macro_rules! feature {
+ (
+ #![$meta:meta]
+ $($item:item)*
+ ) => {
+ $(
+ #[cfg($meta)]
+ #[cfg_attr(docsrs, doc(cfg($meta)))]
+ $item
+ )*
+ }
+}
+
/// Enables enter::block_on
macro_rules! cfg_block_on {
($($item:item)*) => {
diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs
index be528f2b..8b0a4803 100644
--- a/tokio/src/net/tcp/listener.rs
+++ b/tokio/src/net/tcp/listener.rs
@@ -1,4 +1,4 @@
-use crate::io::PollEvented;
+use crate::io::{Interest, PollEvented};
use crate::net::tcp::TcpStream;
use crate::net::{to_socket_addrs, ToSocketAddrs};
@@ -164,7 +164,8 @@ impl TcpListener {
pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
let (mio, addr) = self
.io
- .async_io(mio::Interest::READABLE, |sock| sock.accept())
+ .registration()
+ .async_io(Interest::READABLE, || self.io.accept())
.await?;
let stream = TcpStream::new(mio)?;
@@ -181,15 +182,15 @@ impl TcpListener {
/// single task. Failing to do this could result in tasks hanging.
pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(TcpStream, SocketAddr)>> {
loop {
- let ev = ready!(self.io.poll_read_ready(cx))?;
+ let ev = ready!(self.io.registration().poll_read_ready(cx))?;
- match self.io.get_ref().accept() {
+ match self.io.accept() {
Ok((io, addr)) => {
let io = TcpStream::new(io)?;
return Poll::Ready(Ok((io, addr)));
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_readiness(ev);
+ self.io.registration().clear_readiness(ev);
}
Err(e) => return Poll::Ready(Err(e)),
}
@@ -266,7 +267,7 @@ impl TcpListener {
/// }
/// ```
pub fn local_addr(&self) -> io::Result<SocketAddr> {
- self.io.get_ref().local_addr()
+ self.io.local_addr()
}
/// Gets the value of the `IP_TTL` option for this socket.
@@ -293,7 +294,7 @@ impl TcpListener {
/// }
/// ```
pub fn ttl(&self) -> io::Result<u32> {
- self.io.get_ref().ttl()
+ self.io.ttl()
}
/// Sets the value for the `IP_TTL` option on this socket.
@@ -318,7 +319,7 @@ impl TcpListener {
/// }
/// ```
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
- self.io.get_ref().set_ttl(ttl)
+ self.io.set_ttl(ttl)
}
}
@@ -346,7 +347,7 @@ impl TryFrom<net::TcpListener> for TcpListener {
impl fmt::Debug for TcpListener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- self.io.get_ref().fmt(f)
+ self.io.fmt(f)
}
}
@@ -357,7 +358,7 @@ mod sys {
impl AsRawFd for TcpListener {
fn as_raw_fd(&self) -> RawFd {
- self.io.get_ref().as_raw_fd()
+ self.io.as_raw_fd()
}
}
}
@@ -369,7 +370,7 @@ mod sys {
impl AsRawSocket for TcpListener {
fn as_raw_socket(&self) -> RawSocket {
- self.io.get_ref().as_raw_socket()
+ self.io.as_raw_socket()
}
}
}
diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs
index 045cb6c3..0a784b5f 100644
--- a/tokio/src/net/tcp/stream.rs
+++ b/tokio/src/net/tcp/stream.rs
@@ -1,12 +1,12 @@
use crate::future::poll_fn;
-use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf};
+use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf};
use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
use crate::net::{to_socket_addrs, ToSocketAddrs};
use std::convert::TryFrom;
use std::fmt;
-use std::io::{self, Read, Write};
+use std::io;
use std::net::{Shutdown, SocketAddr};
use std::pin::Pin;
use std::task::{Context, Poll};<