From e1256d8ca4c510c8417874660e74760d87178445 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 10 Nov 2020 09:40:20 -0800 Subject: io: update AsyncFd to use Registration (#3113) --- tokio/src/io/async_fd.rs | 269 +++++++++++++++++++------------------------ tokio/src/io/driver/mod.rs | 6 - tokio/src/io/registration.rs | 13 ++- tokio/tests/io_async_fd.rs | 6 +- 4 files changed, 129 insertions(+), 165 deletions(-) (limited to 'tokio') diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs index 14c29f41..617f42e0 100644 --- a/tokio/src/io/async_fd.rs +++ b/tokio/src/io/async_fd.rs @@ -1,12 +1,10 @@ -use std::os::unix::io::{AsRawFd, RawFd}; -use std::{task::Context, task::Poll}; - -use std::io; +use crate::io::driver::{Direction, Handle, ReadyEvent}; +use crate::io::registration::Registration; use mio::unix::SourceFd; - -use crate::io::driver::{Direction, Handle, ReadyEvent, ScheduledIo}; -use crate::util::slab; +use std::io; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::{task::Context, task::Poll}; /// Associates an IO object backed by a Unix file descriptor with the tokio /// reactor, allowing for readiness to be polled. The file descriptor must be of @@ -64,27 +62,9 @@ use crate::util::slab; /// [`AsyncFdReadyGuard`]: struct@self::AsyncFdReadyGuard /// [`TcpStream::poll_read_ready`]: struct@crate::net::TcpStream pub struct AsyncFd { - handle: Handle, - shared: slab::Ref, + registration: Registration, inner: Option, } - -impl AsRawFd for AsyncFd { - fn as_raw_fd(&self) -> RawFd { - self.inner.as_ref().unwrap().as_raw_fd() - } -} - -impl std::fmt::Debug for AsyncFd { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("AsyncFd") - .field("inner", &self.inner) - .finish() - } -} - -const ALL_INTEREST: mio::Interest = mio::Interest::READABLE.add(mio::Interest::WRITABLE); - /// Represents an IO-ready event detected on a particular file descriptor, which /// has not yet been acknowledged. This is a `must_use` structure to help ensure /// that you do not forget to explicitly clear (or not clear) the event. @@ -94,92 +74,7 @@ pub struct AsyncFdReadyGuard<'a, T: AsRawFd> { event: Option, } -impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyGuard<'a, T> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ReadyGuard") - .field("async_fd", &self.async_fd) - .finish() - } -} - -impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> { - /// Indicates to tokio that the file descriptor is no longer ready. The - /// internal readiness flag will be cleared, and tokio will wait for the - /// next edge-triggered readiness notification from the OS. - /// - /// It is critical that this function not be called unless your code - /// _actually observes_ that the file descriptor is _not_ ready. Do not call - /// it simply because, for example, a read succeeded; it should be called - /// when a read is observed to block. - /// - /// [`drop`]: method@std::mem::drop - pub fn clear_ready(&mut self) { - if let Some(event) = self.event.take() { - self.async_fd.shared.clear_readiness(event); - } - } - - /// This function should be invoked when you intentionally want to keep the - /// ready flag asserted. - /// - /// While this function is itself a no-op, it satisfies the `#[must_use]` - /// constraint on the [`AsyncFdReadyGuard`] type. - pub fn retain_ready(&mut self) { - // no-op - } - - /// Performs the IO operation `f`; if `f` returns a [`WouldBlock`] error, - /// the readiness state associated with this file descriptor is cleared. - /// - /// This method helps ensure that the readiness state of the underlying file - /// descriptor remains in sync with the tokio-side readiness state, by - /// clearing the tokio-side state only when a [`WouldBlock`] condition - /// occurs. It is the responsibility of the caller to ensure that `f` - /// returns [`WouldBlock`] only if the file descriptor that originated this - /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to - /// create this `AsyncFdReadyGuard`. - /// - /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock - pub fn with_io(&mut self, f: impl FnOnce() -> io::Result) -> io::Result { - let result = f(); - - if let Err(e) = result.as_ref() { - if e.kind() == io::ErrorKind::WouldBlock { - self.clear_ready(); - } - } - - result - } - - /// Performs the IO operation `f`; if `f` returns [`Pending`], the readiness - /// state associated with this file descriptor is cleared. - /// - /// This method helps ensure that the readiness state of the underlying file - /// descriptor remains in sync with the tokio-side readiness state, by - /// clearing the tokio-side state only when a [`Pending`] condition occurs. - /// It is the responsibility of the caller to ensure that `f` returns - /// [`Pending`] only if the file descriptor that originated this - /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to - /// create this `AsyncFdReadyGuard`. - /// - /// [`Pending`]: std::task::Poll::Pending - pub fn with_poll(&mut self, f: impl FnOnce() -> std::task::Poll) -> std::task::Poll { - let result = f(); - - if result.is_pending() { - self.clear_ready(); - } - - result - } -} - -impl Drop for AsyncFd { - fn drop(&mut self) { - let _ = self.take_inner(); - } -} +const ALL_INTEREST: mio::Interest = mio::Interest::READABLE.add(mio::Interest::WRITABLE); impl AsyncFd { /// Creates an AsyncFd backed by (and taking ownership of) an object @@ -197,18 +92,11 @@ impl AsyncFd { pub(crate) fn new_with_handle(inner: T, handle: Handle) -> io::Result { let fd = inner.as_raw_fd(); - let shared = if let Some(inner) = handle.inner() { - inner.add_source(&mut SourceFd(&fd), ALL_INTEREST)? - } else { - return Err(io::Error::new( - io::ErrorKind::Other, - "failed to find event loop", - )); - }; + let registration = + Registration::new_with_interest_and_handle(&mut SourceFd(&fd), ALL_INTEREST, handle)?; Ok(AsyncFd { - handle, - shared, + registration, inner: Some(inner), }) } @@ -227,11 +115,11 @@ impl AsyncFd { fn take_inner(&mut self) -> Option { let fd = self.inner.as_ref().map(AsRawFd::as_raw_fd); + if let Some(fd) = fd { - if let Some(driver) = self.handle.inner() { - let _ = driver.deregister_source(&mut SourceFd(&fd)); - } + let _ = self.registration.deregister(&mut SourceFd(&fd)); } + self.inner.take() } @@ -257,15 +145,7 @@ impl AsyncFd { &'a self, cx: &mut Context<'_>, ) -> Poll>> { - let event = ready!(self.shared.poll_readiness(cx, Direction::Read)); - - if !self.handle.is_alive() { - return Err(io::Error::new( - io::ErrorKind::Other, - "IO driver has terminated", - )) - .into(); - } + let event = ready!(self.registration.poll_readiness(cx, Direction::Read))?; Ok(AsyncFdReadyGuard { async_fd: self, @@ -290,15 +170,7 @@ impl AsyncFd { &'a self, cx: &mut Context<'_>, ) -> Poll>> { - let event = ready!(self.shared.poll_readiness(cx, Direction::Write)); - - if !self.handle.is_alive() { - return Err(io::Error::new( - io::ErrorKind::Other, - "IO driver has terminated", - )) - .into(); - } + let event = ready!(self.registration.poll_readiness(cx, Direction::Write))?; Ok(AsyncFdReadyGuard { async_fd: self, @@ -308,16 +180,8 @@ impl AsyncFd { } async fn readiness(&self, interest: mio::Interest) -> io::Result> { - let event = self.shared.readiness(interest); + let event = self.registration.readiness(interest).await?; - if !self.handle.is_alive() { - return Err(io::Error::new( - io::ErrorKind::Other, - "IO driver has terminated", - )); - } - - let event = event.await; Ok(AsyncFdReadyGuard { async_fd: self, event: Some(event), @@ -340,3 +204,104 @@ impl AsyncFd { self.readiness(mio::Interest::WRITABLE).await } } + +impl AsRawFd for AsyncFd { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_ref().unwrap().as_raw_fd() + } +} + +impl std::fmt::Debug for AsyncFd { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AsyncFd") + .field("inner", &self.inner) + .finish() + } +} + +impl Drop for AsyncFd { + fn drop(&mut self) { + let _ = self.take_inner(); + } +} + +impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> { + /// Indicates to tokio that the file descriptor is no longer ready. The + /// internal readiness flag will be cleared, and tokio will wait for the + /// next edge-triggered readiness notification from the OS. + /// + /// It is critical that this function not be called unless your code + /// _actually observes_ that the file descriptor is _not_ ready. Do not call + /// it simply because, for example, a read succeeded; it should be called + /// when a read is observed to block. + /// + /// [`drop`]: method@std::mem::drop + pub fn clear_ready(&mut self) { + if let Some(event) = self.event.take() { + self.async_fd.registration.clear_readiness(event); + } + } + + /// This function should be invoked when you intentionally want to keep the + /// ready flag asserted. + /// + /// While this function is itself a no-op, it satisfies the `#[must_use]` + /// constraint on the [`AsyncFdReadyGuard`] type. + pub fn retain_ready(&mut self) { + // no-op + } + + /// Performs the IO operation `f`; if `f` returns a [`WouldBlock`] error, + /// the readiness state associated with this file descriptor is cleared. + /// + /// This method helps ensure that the readiness state of the underlying file + /// descriptor remains in sync with the tokio-side readiness state, by + /// clearing the tokio-side state only when a [`WouldBlock`] condition + /// occurs. It is the responsibility of the caller to ensure that `f` + /// returns [`WouldBlock`] only if the file descriptor that originated this + /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to + /// create this `AsyncFdReadyGuard`. + /// + /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock + pub fn with_io(&mut self, f: impl FnOnce() -> io::Result) -> io::Result { + let result = f(); + + if let Err(e) = result.as_ref() { + if e.kind() == io::ErrorKind::WouldBlock { + self.clear_ready(); + } + } + + result + } + + /// Performs the IO operation `f`; if `f` returns [`Pending`], the readiness + /// state associated with this file descriptor is cleared. + /// + /// This method helps ensure that the readiness state of the underlying file + /// descriptor remains in sync with the tokio-side readiness state, by + /// clearing the tokio-side state only when a [`Pending`] condition occurs. + /// It is the responsibility of the caller to ensure that `f` returns + /// [`Pending`] only if the file descriptor that originated this + /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to + /// create this `AsyncFdReadyGuard`. + /// + /// [`Pending`]: std::task::Poll::Pending + pub fn with_poll(&mut self, f: impl FnOnce() -> std::task::Poll) -> std::task::Poll { + let result = f(); + + if result.is_pending() { + self.clear_ready(); + } + + result + } +} + +impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyGuard<'a, T> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ReadyGuard") + .field("async_fd", &self.async_fd) + .finish() + } +} diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index a0d8e6f2..a7d4b7b8 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -290,12 +290,6 @@ impl Handle { pub(super) fn inner(&self) -> Option> { self.inner.upgrade() } - - cfg_net_unix! { - pub(super) fn is_alive(&self) -> bool { - self.inner.strong_count() > 0 - } - } } impl Unpark for Handle { diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs index ce6cffda..0b166490 100644 --- a/tokio/src/io/registration.rs +++ b/tokio/src/io/registration.rs @@ -117,18 +117,23 @@ impl Registration { cx: &mut Context<'_>, direction: Direction, ) -> Poll> { - if self.handle.inner().is_none() { - return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone"))); - } - // Keep track of task budget let coop = ready!(crate::coop::poll_proceed(cx)); let ev = ready!(self.shared.poll_readiness(cx, direction)); + + if self.handle.inner().is_none() { + return Poll::Ready(Err(gone())); + } + coop.made_progress(); Poll::Ready(Ok(ev)) } } +fn gone() -> io::Error { + io::Error::new(io::ErrorKind::Other, "IO driver has terminated") +} + cfg_io_readiness! { impl Registration { pub(super) async fn readiness(&self, interest: mio::Interest) -> io::Result { diff --git a/tokio/tests/io_async_fd.rs b/tokio/tests/io_async_fd.rs index f3b89b35..f8dc65fe 100644 --- a/tokio/tests/io_async_fd.rs +++ b/tokio/tests/io_async_fd.rs @@ -500,10 +500,10 @@ fn driver_shutdown_wakes_currently_pending() { std::mem::drop(rt); - // Being awoken by a rt drop does not return an error, currently... - let _ = futures::executor::block_on(readable).unwrap(); + // The future was initialized **before** dropping the rt + assert_err!(futures::executor::block_on(readable)); - // However, attempting to initiate a readiness wait when the rt is dropped is an error + // The future is initialized **after** dropping the rt. assert_err!(futures::executor::block_on(afd_a.readable())); } -- cgit v1.2.3