summaryrefslogtreecommitdiffstats
path: root/tokio
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-11-10 09:40:20 -0800
committerGitHub <noreply@github.com>2020-11-10 09:40:20 -0800
commite1256d8ca4c510c8417874660e74760d87178445 (patch)
tree08312ba9a260aa69ce9f5c3a1e1a4c76ae0205ed /tokio
parenta52f5071bf5ecf31c44e8aba5d8611400c50eb71 (diff)
io: update AsyncFd to use Registration (#3113)
Diffstat (limited to 'tokio')
-rw-r--r--tokio/src/io/async_fd.rs269
-rw-r--r--tokio/src/io/driver/mod.rs6
-rw-r--r--tokio/src/io/registration.rs13
-rw-r--r--tokio/tests/io_async_fd.rs6
4 files changed, 129 insertions, 165 deletions
diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs
index 14c29f41..617f42e0 100644
--- a/tokio/src/io/async_fd.rs
+++ b/tokio/src/io/async_fd.rs
@@ -1,12 +1,10 @@
-use std::os::unix::io::{AsRawFd, RawFd};
-use std::{task::Context, task::Poll};
-
-use std::io;
+use crate::io::driver::{Direction, Handle, ReadyEvent};
+use crate::io::registration::Registration;
use mio::unix::SourceFd;
-
-use crate::io::driver::{Direction, Handle, ReadyEvent, ScheduledIo};
-use crate::util::slab;
+use std::io;
+use std::os::unix::io::{AsRawFd, RawFd};
+use std::{task::Context, task::Poll};
/// Associates an IO object backed by a Unix file descriptor with the tokio
/// reactor, allowing for readiness to be polled. The file descriptor must be of
@@ -64,27 +62,9 @@ use crate::util::slab;
/// [`AsyncFdReadyGuard`]: struct@self::AsyncFdReadyGuard
/// [`TcpStream::poll_read_ready`]: struct@crate::net::TcpStream
pub struct AsyncFd<T: AsRawFd> {
- handle: Handle,
- shared: slab::Ref<ScheduledIo>,
+ registration: Registration,
inner: Option<T>,
}
-
-impl<T: AsRawFd> AsRawFd for AsyncFd<T> {
- fn as_raw_fd(&self) -> RawFd {
- self.inner.as_ref().unwrap().as_raw_fd()
- }
-}
-
-impl<T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFd<T> {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("AsyncFd")
- .field("inner", &self.inner)
- .finish()
- }
-}
-
-const ALL_INTEREST: mio::Interest = mio::Interest::READABLE.add(mio::Interest::WRITABLE);
-
/// Represents an IO-ready event detected on a particular file descriptor, which
/// has not yet been acknowledged. This is a `must_use` structure to help ensure
/// that you do not forget to explicitly clear (or not clear) the event.
@@ -94,92 +74,7 @@ pub struct AsyncFdReadyGuard<'a, T: AsRawFd> {
event: Option<ReadyEvent>,
}
-impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyGuard<'a, T> {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("ReadyGuard")
- .field("async_fd", &self.async_fd)
- .finish()
- }
-}
-
-impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> {
- /// Indicates to tokio that the file descriptor is no longer ready. The
- /// internal readiness flag will be cleared, and tokio will wait for the
- /// next edge-triggered readiness notification from the OS.
- ///
- /// It is critical that this function not be called unless your code
- /// _actually observes_ that the file descriptor is _not_ ready. Do not call
- /// it simply because, for example, a read succeeded; it should be called
- /// when a read is observed to block.
- ///
- /// [`drop`]: method@std::mem::drop
- pub fn clear_ready(&mut self) {
- if let Some(event) = self.event.take() {
- self.async_fd.shared.clear_readiness(event);
- }
- }
-
- /// This function should be invoked when you intentionally want to keep the
- /// ready flag asserted.
- ///
- /// While this function is itself a no-op, it satisfies the `#[must_use]`
- /// constraint on the [`AsyncFdReadyGuard`] type.
- pub fn retain_ready(&mut self) {
- // no-op
- }
-
- /// Performs the IO operation `f`; if `f` returns a [`WouldBlock`] error,
- /// the readiness state associated with this file descriptor is cleared.
- ///
- /// This method helps ensure that the readiness state of the underlying file
- /// descriptor remains in sync with the tokio-side readiness state, by
- /// clearing the tokio-side state only when a [`WouldBlock`] condition
- /// occurs. It is the responsibility of the caller to ensure that `f`
- /// returns [`WouldBlock`] only if the file descriptor that originated this
- /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to
- /// create this `AsyncFdReadyGuard`.
- ///
- /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
- pub fn with_io<R>(&mut self, f: impl FnOnce() -> io::Result<R>) -> io::Result<R> {
- let result = f();
-
- if let Err(e) = result.as_ref() {
- if e.kind() == io::ErrorKind::WouldBlock {
- self.clear_ready();
- }
- }
-
- result
- }
-
- /// Performs the IO operation `f`; if `f` returns [`Pending`], the readiness
- /// state associated with this file descriptor is cleared.
- ///
- /// This method helps ensure that the readiness state of the underlying file
- /// descriptor remains in sync with the tokio-side readiness state, by
- /// clearing the tokio-side state only when a [`Pending`] condition occurs.
- /// It is the responsibility of the caller to ensure that `f` returns
- /// [`Pending`] only if the file descriptor that originated this
- /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to
- /// create this `AsyncFdReadyGuard`.
- ///
- /// [`Pending`]: std::task::Poll::Pending
- pub fn with_poll<R>(&mut self, f: impl FnOnce() -> std::task::Poll<R>) -> std::task::Poll<R> {
- let result = f();
-
- if result.is_pending() {
- self.clear_ready();
- }
-
- result
- }
-}
-
-impl<T: AsRawFd> Drop for AsyncFd<T> {
- fn drop(&mut self) {
- let _ = self.take_inner();
- }
-}
+const ALL_INTEREST: mio::Interest = mio::Interest::READABLE.add(mio::Interest::WRITABLE);
impl<T: AsRawFd> AsyncFd<T> {
/// Creates an AsyncFd backed by (and taking ownership of) an object
@@ -197,18 +92,11 @@ impl<T: AsRawFd> AsyncFd<T> {
pub(crate) fn new_with_handle(inner: T, handle: Handle) -> io::Result<Self> {
let fd = inner.as_raw_fd();
- let shared = if let Some(inner) = handle.inner() {
- inner.add_source(&mut SourceFd(&fd), ALL_INTEREST)?
- } else {
- return Err(io::Error::new(
- io::ErrorKind::Other,
- "failed to find event loop",
- ));
- };
+ let registration =
+ Registration::new_with_interest_and_handle(&mut SourceFd(&fd), ALL_INTEREST, handle)?;
Ok(AsyncFd {
- handle,
- shared,
+ registration,
inner: Some(inner),
})
}
@@ -227,11 +115,11 @@ impl<T: AsRawFd> AsyncFd<T> {
fn take_inner(&mut self) -> Option<T> {
let fd = self.inner.as_ref().map(AsRawFd::as_raw_fd);
+
if let Some(fd) = fd {
- if let Some(driver) = self.handle.inner() {
- let _ = driver.deregister_source(&mut SourceFd(&fd));
- }
+ let _ = self.registration.deregister(&mut SourceFd(&fd));
}
+
self.inner.take()
}
@@ -257,15 +145,7 @@ impl<T: AsRawFd> AsyncFd<T> {
&'a self,
cx: &mut Context<'_>,
) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>> {
- let event = ready!(self.shared.poll_readiness(cx, Direction::Read));
-
- if !self.handle.is_alive() {
- return Err(io::Error::new(
- io::ErrorKind::Other,
- "IO driver has terminated",
- ))
- .into();
- }
+ let event = ready!(self.registration.poll_readiness(cx, Direction::Read))?;
Ok(AsyncFdReadyGuard {
async_fd: self,
@@ -290,15 +170,7 @@ impl<T: AsRawFd> AsyncFd<T> {
&'a self,
cx: &mut Context<'_>,
) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>> {
- let event = ready!(self.shared.poll_readiness(cx, Direction::Write));
-
- if !self.handle.is_alive() {
- return Err(io::Error::new(
- io::ErrorKind::Other,
- "IO driver has terminated",
- ))
- .into();
- }
+ let event = ready!(self.registration.poll_readiness(cx, Direction::Write))?;
Ok(AsyncFdReadyGuard {
async_fd: self,
@@ -308,16 +180,8 @@ impl<T: AsRawFd> AsyncFd<T> {
}
async fn readiness(&self, interest: mio::Interest) -> io::Result<AsyncFdReadyGuard<'_, T>> {
- let event = self.shared.readiness(interest);
+ let event = self.registration.readiness(interest).await?;
- if !self.handle.is_alive() {
- return Err(io::Error::new(
- io::ErrorKind::Other,
- "IO driver has terminated",
- ));
- }
-
- let event = event.await;
Ok(AsyncFdReadyGuard {
async_fd: self,
event: Some(event),
@@ -340,3 +204,104 @@ impl<T: AsRawFd> AsyncFd<T> {
self.readiness(mio::Interest::WRITABLE).await
}
}
+
+impl<T: AsRawFd> AsRawFd for AsyncFd<T> {
+ fn as_raw_fd(&self) -> RawFd {
+ self.inner.as_ref().unwrap().as_raw_fd()
+ }
+}
+
+impl<T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFd<T> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("AsyncFd")
+ .field("inner", &self.inner)
+ .finish()
+ }
+}
+
+impl<T: AsRawFd> Drop for AsyncFd<T> {
+ fn drop(&mut self) {
+ let _ = self.take_inner();
+ }
+}
+
+impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> {
+ /// Indicates to tokio that the file descriptor is no longer ready. The
+ /// internal readiness flag will be cleared, and tokio will wait for the
+ /// next edge-triggered readiness notification from the OS.
+ ///
+ /// It is critical that this function not be called unless your code
+ /// _actually observes_ that the file descriptor is _not_ ready. Do not call
+ /// it simply because, for example, a read succeeded; it should be called
+ /// when a read is observed to block.
+ ///
+ /// [`drop`]: method@std::mem::drop
+ pub fn clear_ready(&mut self) {
+ if let Some(event) = self.event.take() {
+ self.async_fd.registration.clear_readiness(event);
+ }
+ }
+
+ /// This function should be invoked when you intentionally want to keep the
+ /// ready flag asserted.
+ ///
+ /// While this function is itself a no-op, it satisfies the `#[must_use]`
+ /// constraint on the [`AsyncFdReadyGuard`] type.
+ pub fn retain_ready(&mut self) {
+ // no-op
+ }
+
+ /// Performs the IO operation `f`; if `f` returns a [`WouldBlock`] error,
+ /// the readiness state associated with this file descriptor is cleared.
+ ///
+ /// This method helps ensure that the readiness state of the underlying file
+ /// descriptor remains in sync with the tokio-side readiness state, by
+ /// clearing the tokio-side state only when a [`WouldBlock`] condition
+ /// occurs. It is the responsibility of the caller to ensure that `f`
+ /// returns [`WouldBlock`] only if the file descriptor that originated this
+ /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to
+ /// create this `AsyncFdReadyGuard`.
+ ///
+ /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
+ pub fn with_io<R>(&mut self, f: impl FnOnce() -> io::Result<R>) -> io::Result<R> {
+ let result = f();
+
+ if let Err(e) = result.as_ref() {
+ if e.kind() == io::ErrorKind::WouldBlock {
+ self.clear_ready();
+ }
+ }
+
+ result
+ }
+
+ /// Performs the IO operation `f`; if `f` returns [`Pending`], the readiness
+ /// state associated with this file descriptor is cleared.
+ ///
+ /// This method helps ensure that the readiness state of the underlying file
+ /// descriptor remains in sync with the tokio-side readiness state, by
+ /// clearing the tokio-side state only when a [`Pending`] condition occurs.
+ /// It is the responsibility of the caller to ensure that `f` returns
+ /// [`Pending`] only if the file descriptor that originated this
+ /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to
+ /// create this `AsyncFdReadyGuard`.
+ ///
+ /// [`Pending`]: std::task::Poll::Pending
+ pub fn with_poll<R>(&mut self, f: impl FnOnce() -> std::task::Poll<R>) -> std::task::Poll<R> {
+ let result = f();
+
+ if result.is_pending() {
+ self.clear_ready();
+ }
+
+ result
+ }
+}
+
+impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyGuard<'a, T> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("ReadyGuard")
+ .field("async_fd", &self.async_fd)
+ .finish()
+ }
+}
diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs
index a0d8e6f2..a7d4b7b8 100644
--- a/tokio/src/io/driver/mod.rs
+++ b/tokio/src/io/driver/mod.rs
@@ -290,12 +290,6 @@ impl Handle {
pub(super) fn inner(&self) -> Option<Arc<Inner>> {
self.inner.upgrade()
}
-
- cfg_net_unix! {
- pub(super) fn is_alive(&self) -> bool {
- self.inner.strong_count() > 0
- }
- }
}
impl Unpark for Handle {
diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs
index ce6cffda..0b166490 100644
--- a/tokio/src/io/registration.rs
+++ b/tokio/src/io/registration.rs
@@ -117,18 +117,23 @@ impl Registration {
cx: &mut Context<'_>,
direction: Direction,
) -> Poll<io::Result<ReadyEvent>> {
- if self.handle.inner().is_none() {
- return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone")));
- }
-
// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));
let ev = ready!(self.shared.poll_readiness(cx, direction));
+
+ if self.handle.inner().is_none() {
+ return Poll::Ready(Err(gone()));
+ }
+
coop.made_progress();
Poll::Ready(Ok(ev))
}
}
+fn gone() -> io::Error {
+ io::Error::new(io::ErrorKind::Other, "IO driver has terminated")
+}
+
cfg_io_readiness! {
impl Registration {
pub(super) async fn readiness(&self, interest: mio::Interest) -> io::Result<ReadyEvent> {
diff --git a/tokio/tests/io_async_fd.rs b/tokio/tests/io_async_fd.rs
index f3b89b35..f8dc65fe 100644
--- a/tokio/tests/io_async_fd.rs
+++ b/tokio/tests/io_async_fd.rs
@@ -500,10 +500,10 @@ fn driver_shutdown_wakes_currently_pending() {
std::mem::drop(rt);
- // Being awoken by a rt drop does not return an error, currently...
- let _ = futures::executor::block_on(readable).unwrap();
+ // The future was initialized **before** dropping the rt
+ assert_err!(futures::executor::block_on(readable));
- // However, attempting to initiate a readiness wait when the rt is dropped is an error
+ // The future is initialized **after** dropping the rt.
assert_err!(futures::executor::block_on(afd_a.readable()));
}