summaryrefslogtreecommitdiffstats
path: root/tokio/src/io/driver
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-11-11 09:28:21 -0800
committerGitHub <noreply@github.com>2020-11-11 09:28:21 -0800
commitce891a4df17e632f7557dd0cd1f1e8da89bd6ae4 (patch)
treefa5478c0b3bacacfc65bfbadbe1cdb92234d5b5f /tokio/src/io/driver
parentd869e16990c5fc2cbda48b036708efa4b450e807 (diff)
io: driver internal cleanup (#3124)
* Removes duplicated code by moving it to `Registration`. * impl `Deref` for `PollEvented` to avoid `get_ref()`. * Avoid extra waker clones in I/O driver. * Add `Interest` wrapper around `mio::Interest`.
Diffstat (limited to 'tokio/src/io/driver')
-rw-r--r--tokio/src/io/driver/interest.rs58
-rw-r--r--tokio/src/io/driver/mod.rs14
-rw-r--r--tokio/src/io/driver/ready.rs8
-rw-r--r--tokio/src/io/driver/registration.rs222
-rw-r--r--tokio/src/io/driver/scheduled_io.rs24
5 files changed, 314 insertions, 12 deletions
diff --git a/tokio/src/io/driver/interest.rs b/tokio/src/io/driver/interest.rs
new file mode 100644
index 00000000..f9887e86
--- /dev/null
+++ b/tokio/src/io/driver/interest.rs
@@ -0,0 +1,58 @@
+use std::fmt;
+use std::ops;
+
+/// Readiness event interest
+///
+/// Specifies the readiness events the caller is interested in when awaiting on
+/// I/O resource readiness states.
+#[derive(Clone, Copy)]
+pub(crate) struct Interest(mio::Interest);
+
+impl Interest {
+ /// Interest in all readable events
+ pub(crate) const READABLE: Interest = Interest(mio::Interest::READABLE);
+
+ /// Interest in all writable events
+ pub(crate) const WRITABLE: Interest = Interest(mio::Interest::WRITABLE);
+
+ /// Returns true if the value includes readable interest.
+ pub(crate) const fn is_readable(self) -> bool {
+ self.0.is_readable()
+ }
+
+ /// Returns true if the value includes writable interest.
+ pub(crate) const fn is_writable(self) -> bool {
+ self.0.is_writable()
+ }
+
+ /// Add together two `Interst` values.
+ pub(crate) const fn add(self, other: Interest) -> Interest {
+ Interest(self.0.add(other.0))
+ }
+
+ pub(crate) const fn to_mio(self) -> mio::Interest {
+ self.0
+ }
+}
+
+impl ops::BitOr for Interest {
+ type Output = Self;
+
+ #[inline]
+ fn bitor(self, other: Self) -> Self {
+ self.add(other)
+ }
+}
+
+impl ops::BitOrAssign for Interest {
+ #[inline]
+ fn bitor_assign(&mut self, other: Self) {
+ self.0 = (*self | other).0;
+ }
+}
+
+impl fmt::Debug for Interest {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.0.fmt(fmt)
+ }
+}
diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs
index a7d4b7b8..c494db41 100644
--- a/tokio/src/io/driver/mod.rs
+++ b/tokio/src/io/driver/mod.rs
@@ -1,10 +1,16 @@
#![cfg_attr(not(feature = "rt"), allow(dead_code))]
+mod interest;
+pub(crate) use interest::Interest;
+
mod ready;
use ready::Ready;
+mod registration;
+pub(crate) use registration::Registration;
+
mod scheduled_io;
-pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests
+use scheduled_io::ScheduledIo;
use crate::park::{Park, Unpark};
use crate::util::slab::{self, Slab};
@@ -68,7 +74,7 @@ pub(super) struct Inner {
}
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
-pub(super) enum Direction {
+enum Direction {
Read,
Write,
}
@@ -313,7 +319,7 @@ impl Inner {
pub(super) fn add_source(
&self,
source: &mut impl mio::event::Source,
- interest: mio::Interest,
+ interest: Interest,
) -> io::Result<slab::Ref<ScheduledIo>> {
let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| {
io::Error::new(
@@ -325,7 +331,7 @@ impl Inner {
let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0));
self.registry
- .register(source, mio::Token(token), interest)?;
+ .register(source, mio::Token(token), interest.to_mio())?;
Ok(shared)
}
diff --git a/tokio/src/io/driver/ready.rs b/tokio/src/io/driver/ready.rs
index 8b556e94..2790cc13 100644
--- a/tokio/src/io/driver/ready.rs
+++ b/tokio/src/io/driver/ready.rs
@@ -114,8 +114,10 @@ impl Ready {
}
cfg_io_readiness! {
+ use crate::io::Interest;
+
impl Ready {
- pub(crate) fn from_interest(interest: mio::Interest) -> Ready {
+ pub(crate) fn from_interest(interest: Interest) -> Ready {
let mut ready = Ready::EMPTY;
if interest.is_readable() {
@@ -131,11 +133,11 @@ cfg_io_readiness! {
ready
}
- pub(crate) fn intersection(self, interest: mio::Interest) -> Ready {
+ pub(crate) fn intersection(self, interest: Interest) -> Ready {
Ready(self.0 & Ready::from_interest(interest).0)
}
- pub(crate) fn satisfies(self, interest: mio::Interest) -> bool {
+ pub(crate) fn satisfies(self, interest: Interest) -> bool {
self.0 & Ready::from_interest(interest).0 != 0
}
}
diff --git a/tokio/src/io/driver/registration.rs b/tokio/src/io/driver/registration.rs
new file mode 100644
index 00000000..db9afdd7
--- /dev/null
+++ b/tokio/src/io/driver/registration.rs
@@ -0,0 +1,222 @@
+use crate::io::driver::{Direction, Handle, Interest, ReadyEvent, ScheduledIo};
+use crate::util::slab;
+
+use mio::event::Source;
+use std::io;
+use std::task::{Context, Poll};
+
+cfg_io_driver! {
+ /// Associates an I/O resource with the reactor instance that drives it.
+ ///
+ /// A registration represents an I/O resource registered with a Reactor such
+ /// that it will receive task notifications on readiness. This is the lowest
+ /// level API for integrating with a reactor.
+ ///
+ /// The association between an I/O resource is made by calling [`new`]. Once
+ /// the association is established, it remains established until the
+ /// registration instance is dropped.
+ ///
+ /// A registration instance represents two separate readiness streams. One
+ /// for the read readiness and one for write readiness. These streams are
+ /// independent and can be consumed from separate tasks.
+ ///
+ /// **Note**: while `Registration` is `Sync`, the caller must ensure that
+ /// there are at most two tasks that use a registration instance
+ /// concurrently. One task for [`poll_read_ready`] and one task for
+ /// [`poll_write_ready`]. While violating this requirement is "safe" from a
+ /// Rust memory safety point of view, it will result in unexpected behavior
+ /// in the form of lost notifications and tasks hanging.
+ ///
+ /// ## Platform-specific events
+ ///
+ /// `Registration` also allows receiving platform-specific `mio::Ready`
+ /// events. These events are included as part of the read readiness event
+ /// stream. The write readiness event stream is only for `Ready::writable()`
+ /// events.
+ ///
+ /// [`new`]: method@Self::new
+ /// [`poll_read_ready`]: method@Self::poll_read_ready`
+ /// [`poll_write_ready`]: method@Self::poll_write_ready`
+ #[derive(Debug)]
+ pub(crate) struct Registration {
+ /// Handle to the associated driver.
+ handle: Handle,
+
+ /// Reference to state stored by the driver.
+ shared: slab::Ref<ScheduledIo>,
+ }
+}
+
+unsafe impl Send for Registration {}
+unsafe impl Sync for Registration {}
+
+// ===== impl Registration =====
+
+impl Registration {
+ /// Registers the I/O resource with the default reactor, for a specific
+ /// `Interest`. `new_with_interest` should be used over `new` when you need
+ /// control over the readiness state, such as when a file descriptor only
+ /// allows reads. This does not add `hup` or `error` so if you are
+ /// interested in those states, you will need to add them to the readiness
+ /// state passed to this function.
+ ///
+ /// # Return
+ ///
+ /// - `Ok` if the registration happened successfully
+ /// - `Err` if an error was encountered during registration
+ pub(crate) fn new_with_interest_and_handle(
+ io: &mut impl Source,
+ interest: Interest,
+ handle: Handle,
+ ) -> io::Result<Registration> {
+ let shared = if let Some(inner) = handle.inner() {
+ inner.add_source(io, interest)?
+ } else {
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ "failed to find event loop",
+ ));
+ };
+
+ Ok(Registration { handle, shared })
+ }
+
+ /// Deregisters the I/O resource from the reactor it is associated with.
+ ///
+ /// This function must be called before the I/O resource associated with the
+ /// registration is dropped.
+ ///
+ /// Note that deregistering does not guarantee that the I/O resource can be
+ /// registered with a different reactor. Some I/O resource types can only be
+ /// associated with a single reactor instance for their lifetime.
+ ///
+ /// # Return
+ ///
+ /// If the deregistration was successful, `Ok` is returned. Any calls to
+ /// `Reactor::turn` that happen after a successful call to `deregister` will
+ /// no longer result in notifications getting sent for this registration.
+ ///
+ /// `Err` is returned if an error is encountered.
+ pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> {
+ let inner = match self.handle.inner() {
+ Some(inner) => inner,
+ None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
+ };
+ inner.deregister_source(io)
+ }
+
+ pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
+ self.shared.clear_readiness(event);
+ }
+
+ // Uses the poll path, requiring the caller to ensure mutual exclusion for
+ // correctness. Only the last task to call this function is notified.
+ pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
+ self.poll_ready(cx, Direction::Read)
+ }
+
+ // Uses the poll path, requiring the caller to ensure mutual exclusion for
+ // correctness. Only the last task to call this function is notified.
+ pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
+ self.poll_ready(cx, Direction::Write)
+ }
+
+ // Uses the poll path, requiring the caller to ensure mutual exclusion for
+ // correctness. Only the last task to call this function is notified.
+ pub(crate) fn poll_read_io<R>(
+ &self,
+ cx: &mut Context<'_>,
+ f: impl FnMut() -> io::Result<R>,
+ ) -> Poll<io::Result<R>> {
+ self.poll_io(cx, Direction::Read, f)
+ }
+
+ // Uses the poll path, requiring the caller to ensure mutual exclusion for
+ // correctness. Only the last task to call this function is notified.
+ pub(crate) fn poll_write_io<R>(
+ &self,
+ cx: &mut Context<'_>,
+ f: impl FnMut() -> io::Result<R>,
+ ) -> Poll<io::Result<R>> {
+ self.poll_io(cx, Direction::Write, f)
+ }
+
+ /// Polls for events on the I/O resource's `direction` readiness stream.
+ ///
+ /// If called with a task context, notify the task when a new event is
+ /// received.
+ fn poll_ready(
+ &self,
+ cx: &mut Context<'_>,
+ direction: Direction,
+ ) -> Poll<io::Result<ReadyEvent>> {
+ // Keep track of task budget
+ let coop = ready!(crate::coop::poll_proceed(cx));
+ let ev = ready!(self.shared.poll_readiness(cx, direction));
+
+ if self.handle.inner().is_none() {
+ return Poll::Ready(Err(gone()));
+ }
+
+ coop.made_progress();
+ Poll::Ready(Ok(ev))
+ }
+
+ fn poll_io<R>(
+ &self,
+ cx: &mut Context<'_>,
+ direction: Direction,
+ mut f: impl FnMut() -> io::Result<R>,
+ ) -> Poll<io::Result<R>> {
+ loop {
+ let ev = ready!(self.poll_ready(cx, direction))?;
+
+ match f() {
+ Ok(ret) => {
+ return Poll::Ready(Ok(ret));
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.clear_readiness(ev);
+ }
+ Err(e) => return Poll::Ready(Err(e)),
+ }
+ }
+ }
+}
+
+fn gone() -> io::Error {
+ io::Error::new(io::ErrorKind::Other, "IO driver has terminated")
+}
+
+cfg_io_readiness! {
+ impl Registration {
+ pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> {
+ use std::future::Future;
+ use std::pin::Pin;
+
+ let fut = self.shared.readiness(interest);
+ pin!(fut);
+
+ crate::future::poll_fn(|cx| {
+ if self.handle.inner().is_none() {
+ return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone")));
+ }
+
+ Pin::new(&mut fut).poll(cx).map(Ok)
+ }).await
+ }
+
+ pub(crate) async fn async_io<R>(&self, interest: Interest, mut f: impl FnMut() -> io::Result<R>) -> io::Result<R> {
+ loop {
+ let event = self.readiness(interest).await?;
+
+ match f() {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.clear_readiness(event);
+ }
+ x => return x,
+ }
+ }
+ }
+ }
+}
diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs
index 3aefb376..ed3adc39 100644
--- a/tokio/src/io/driver/scheduled_io.rs
+++ b/tokio/src/io/driver/scheduled_io.rs
@@ -49,6 +49,8 @@ struct Waiters {
}
cfg_io_readiness! {
+ use crate::io::Interest;
+
#[derive(Debug)]
struct Waiter {
pointers: linked_list::Pointers<Waiter>,
@@ -57,7 +59,7 @@ cfg_io_readiness! {
waker: Option<Waker>,
/// The interest this waiter is waiting on
- interest: mio::Interest,
+ interest: Interest,
is_ready: bool,
@@ -283,7 +285,7 @@ impl ScheduledIo {
/// These are to support `AsyncRead` and `AsyncWrite` polling methods,
/// which cannot use the `async fn` version. This uses reserved reader
/// and writer slots.
- pub(in crate::io) fn poll_readiness(
+ pub(super) fn poll_readiness(
&self,
cx: &mut Context<'_>,
direction: Direction,
@@ -299,7 +301,19 @@ impl ScheduledIo {
Direction::Read => &mut waiters.reader,
Direction::Write => &mut waiters.writer,
};
- *slot = Some(cx.waker().clone());
+
+ // Avoid cloning the waker if one is already stored that matches the
+ // current task.
+ match slot {
+ Some(existing) => {
+ if !existing.will_wake(cx.waker()) {
+ *existing = cx.waker().clone();
+ }
+ }
+ None => {
+ *slot = Some(cx.waker().clone());
+ }
+ }
// Try again, in case the readiness was changed while we were
// taking the waiters lock
@@ -348,7 +362,7 @@ unsafe impl Sync for ScheduledIo {}
cfg_io_readiness! {
impl ScheduledIo {
/// An async version of `poll_readiness` which uses a linked list of wakers
- pub(crate) async fn readiness(&self, interest: mio::Interest) -> ReadyEvent {
+ pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent {
self.readiness_fut(interest).await
}
@@ -356,7 +370,7 @@ cfg_io_readiness! {
// we are borrowing the `UnsafeCell` possibly over await boundaries.
//
// Go figure.
- fn readiness_fut(&self, interest: mio::Interest) -> Readiness<'_> {
+ fn readiness_fut(&self, interest: Interest) -> Readiness<'_> {
Readiness {
scheduled_io: self,
state: State::Init,