diff options
author | Carl Lerche <me@carllerche.com> | 2020-11-11 09:28:21 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-11 09:28:21 -0800 |
commit | ce891a4df17e632f7557dd0cd1f1e8da89bd6ae4 (patch) | |
tree | fa5478c0b3bacacfc65bfbadbe1cdb92234d5b5f /tokio/src/io/driver | |
parent | d869e16990c5fc2cbda48b036708efa4b450e807 (diff) |
io: driver internal cleanup (#3124)
* Removes duplicated code by moving it to `Registration`.
* impl `Deref` for `PollEvented` to avoid `get_ref()`.
* Avoid extra waker clones in I/O driver.
* Add `Interest` wrapper around `mio::Interest`.
Diffstat (limited to 'tokio/src/io/driver')
-rw-r--r-- | tokio/src/io/driver/interest.rs | 58 | ||||
-rw-r--r-- | tokio/src/io/driver/mod.rs | 14 | ||||
-rw-r--r-- | tokio/src/io/driver/ready.rs | 8 | ||||
-rw-r--r-- | tokio/src/io/driver/registration.rs | 222 | ||||
-rw-r--r-- | tokio/src/io/driver/scheduled_io.rs | 24 |
5 files changed, 314 insertions, 12 deletions
diff --git a/tokio/src/io/driver/interest.rs b/tokio/src/io/driver/interest.rs new file mode 100644 index 00000000..f9887e86 --- /dev/null +++ b/tokio/src/io/driver/interest.rs @@ -0,0 +1,58 @@ +use std::fmt; +use std::ops; + +/// Readiness event interest +/// +/// Specifies the readiness events the caller is interested in when awaiting on +/// I/O resource readiness states. +#[derive(Clone, Copy)] +pub(crate) struct Interest(mio::Interest); + +impl Interest { + /// Interest in all readable events + pub(crate) const READABLE: Interest = Interest(mio::Interest::READABLE); + + /// Interest in all writable events + pub(crate) const WRITABLE: Interest = Interest(mio::Interest::WRITABLE); + + /// Returns true if the value includes readable interest. + pub(crate) const fn is_readable(self) -> bool { + self.0.is_readable() + } + + /// Returns true if the value includes writable interest. + pub(crate) const fn is_writable(self) -> bool { + self.0.is_writable() + } + + /// Add together two `Interst` values. + pub(crate) const fn add(self, other: Interest) -> Interest { + Interest(self.0.add(other.0)) + } + + pub(crate) const fn to_mio(self) -> mio::Interest { + self.0 + } +} + +impl ops::BitOr for Interest { + type Output = Self; + + #[inline] + fn bitor(self, other: Self) -> Self { + self.add(other) + } +} + +impl ops::BitOrAssign for Interest { + #[inline] + fn bitor_assign(&mut self, other: Self) { + self.0 = (*self | other).0; + } +} + +impl fmt::Debug for Interest { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(fmt) + } +} diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index a7d4b7b8..c494db41 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -1,10 +1,16 @@ #![cfg_attr(not(feature = "rt"), allow(dead_code))] +mod interest; +pub(crate) use interest::Interest; + mod ready; use ready::Ready; +mod registration; +pub(crate) use registration::Registration; + mod scheduled_io; -pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests +use scheduled_io::ScheduledIo; use crate::park::{Park, Unpark}; use crate::util::slab::{self, Slab}; @@ -68,7 +74,7 @@ pub(super) struct Inner { } #[derive(Debug, Eq, PartialEq, Clone, Copy)] -pub(super) enum Direction { +enum Direction { Read, Write, } @@ -313,7 +319,7 @@ impl Inner { pub(super) fn add_source( &self, source: &mut impl mio::event::Source, - interest: mio::Interest, + interest: Interest, ) -> io::Result<slab::Ref<ScheduledIo>> { let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| { io::Error::new( @@ -325,7 +331,7 @@ impl Inner { let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0)); self.registry - .register(source, mio::Token(token), interest)?; + .register(source, mio::Token(token), interest.to_mio())?; Ok(shared) } diff --git a/tokio/src/io/driver/ready.rs b/tokio/src/io/driver/ready.rs index 8b556e94..2790cc13 100644 --- a/tokio/src/io/driver/ready.rs +++ b/tokio/src/io/driver/ready.rs @@ -114,8 +114,10 @@ impl Ready { } cfg_io_readiness! { + use crate::io::Interest; + impl Ready { - pub(crate) fn from_interest(interest: mio::Interest) -> Ready { + pub(crate) fn from_interest(interest: Interest) -> Ready { let mut ready = Ready::EMPTY; if interest.is_readable() { @@ -131,11 +133,11 @@ cfg_io_readiness! { ready } - pub(crate) fn intersection(self, interest: mio::Interest) -> Ready { + pub(crate) fn intersection(self, interest: Interest) -> Ready { Ready(self.0 & Ready::from_interest(interest).0) } - pub(crate) fn satisfies(self, interest: mio::Interest) -> bool { + pub(crate) fn satisfies(self, interest: Interest) -> bool { self.0 & Ready::from_interest(interest).0 != 0 } } diff --git a/tokio/src/io/driver/registration.rs b/tokio/src/io/driver/registration.rs new file mode 100644 index 00000000..db9afdd7 --- /dev/null +++ b/tokio/src/io/driver/registration.rs @@ -0,0 +1,222 @@ +use crate::io::driver::{Direction, Handle, Interest, ReadyEvent, ScheduledIo}; +use crate::util::slab; + +use mio::event::Source; +use std::io; +use std::task::{Context, Poll}; + +cfg_io_driver! { + /// Associates an I/O resource with the reactor instance that drives it. + /// + /// A registration represents an I/O resource registered with a Reactor such + /// that it will receive task notifications on readiness. This is the lowest + /// level API for integrating with a reactor. + /// + /// The association between an I/O resource is made by calling [`new`]. Once + /// the association is established, it remains established until the + /// registration instance is dropped. + /// + /// A registration instance represents two separate readiness streams. One + /// for the read readiness and one for write readiness. These streams are + /// independent and can be consumed from separate tasks. + /// + /// **Note**: while `Registration` is `Sync`, the caller must ensure that + /// there are at most two tasks that use a registration instance + /// concurrently. One task for [`poll_read_ready`] and one task for + /// [`poll_write_ready`]. While violating this requirement is "safe" from a + /// Rust memory safety point of view, it will result in unexpected behavior + /// in the form of lost notifications and tasks hanging. + /// + /// ## Platform-specific events + /// + /// `Registration` also allows receiving platform-specific `mio::Ready` + /// events. These events are included as part of the read readiness event + /// stream. The write readiness event stream is only for `Ready::writable()` + /// events. + /// + /// [`new`]: method@Self::new + /// [`poll_read_ready`]: method@Self::poll_read_ready` + /// [`poll_write_ready`]: method@Self::poll_write_ready` + #[derive(Debug)] + pub(crate) struct Registration { + /// Handle to the associated driver. + handle: Handle, + + /// Reference to state stored by the driver. + shared: slab::Ref<ScheduledIo>, + } +} + +unsafe impl Send for Registration {} +unsafe impl Sync for Registration {} + +// ===== impl Registration ===== + +impl Registration { + /// Registers the I/O resource with the default reactor, for a specific + /// `Interest`. `new_with_interest` should be used over `new` when you need + /// control over the readiness state, such as when a file descriptor only + /// allows reads. This does not add `hup` or `error` so if you are + /// interested in those states, you will need to add them to the readiness + /// state passed to this function. + /// + /// # Return + /// + /// - `Ok` if the registration happened successfully + /// - `Err` if an error was encountered during registration + pub(crate) fn new_with_interest_and_handle( + io: &mut impl Source, + interest: Interest, + handle: Handle, + ) -> io::Result<Registration> { + let shared = if let Some(inner) = handle.inner() { + inner.add_source(io, interest)? + } else { + return Err(io::Error::new( + io::ErrorKind::Other, + "failed to find event loop", + )); + }; + + Ok(Registration { handle, shared }) + } + + /// Deregisters the I/O resource from the reactor it is associated with. + /// + /// This function must be called before the I/O resource associated with the + /// registration is dropped. + /// + /// Note that deregistering does not guarantee that the I/O resource can be + /// registered with a different reactor. Some I/O resource types can only be + /// associated with a single reactor instance for their lifetime. + /// + /// # Return + /// + /// If the deregistration was successful, `Ok` is returned. Any calls to + /// `Reactor::turn` that happen after a successful call to `deregister` will + /// no longer result in notifications getting sent for this registration. + /// + /// `Err` is returned if an error is encountered. + pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> { + let inner = match self.handle.inner() { + Some(inner) => inner, + None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), + }; + inner.deregister_source(io) + } + + pub(crate) fn clear_readiness(&self, event: ReadyEvent) { + self.shared.clear_readiness(event); + } + + // Uses the poll path, requiring the caller to ensure mutual exclusion for + // correctness. Only the last task to call this function is notified. + pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> { + self.poll_ready(cx, Direction::Read) + } + + // Uses the poll path, requiring the caller to ensure mutual exclusion for + // correctness. Only the last task to call this function is notified. + pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> { + self.poll_ready(cx, Direction::Write) + } + + // Uses the poll path, requiring the caller to ensure mutual exclusion for + // correctness. Only the last task to call this function is notified. + pub(crate) fn poll_read_io<R>( + &self, + cx: &mut Context<'_>, + f: impl FnMut() -> io::Result<R>, + ) -> Poll<io::Result<R>> { + self.poll_io(cx, Direction::Read, f) + } + + // Uses the poll path, requiring the caller to ensure mutual exclusion for + // correctness. Only the last task to call this function is notified. + pub(crate) fn poll_write_io<R>( + &self, + cx: &mut Context<'_>, + f: impl FnMut() -> io::Result<R>, + ) -> Poll<io::Result<R>> { + self.poll_io(cx, Direction::Write, f) + } + + /// Polls for events on the I/O resource's `direction` readiness stream. + /// + /// If called with a task context, notify the task when a new event is + /// received. + fn poll_ready( + &self, + cx: &mut Context<'_>, + direction: Direction, + ) -> Poll<io::Result<ReadyEvent>> { + // Keep track of task budget + let coop = ready!(crate::coop::poll_proceed(cx)); + let ev = ready!(self.shared.poll_readiness(cx, direction)); + + if self.handle.inner().is_none() { + return Poll::Ready(Err(gone())); + } + + coop.made_progress(); + Poll::Ready(Ok(ev)) + } + + fn poll_io<R>( + &self, + cx: &mut Context<'_>, + direction: Direction, + mut f: impl FnMut() -> io::Result<R>, + ) -> Poll<io::Result<R>> { + loop { + let ev = ready!(self.poll_ready(cx, direction))?; + + match f() { + Ok(ret) => { + return Poll::Ready(Ok(ret)); + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.clear_readiness(ev); + } + Err(e) => return Poll::Ready(Err(e)), + } + } + } +} + +fn gone() -> io::Error { + io::Error::new(io::ErrorKind::Other, "IO driver has terminated") +} + +cfg_io_readiness! { + impl Registration { + pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> { + use std::future::Future; + use std::pin::Pin; + + let fut = self.shared.readiness(interest); + pin!(fut); + + crate::future::poll_fn(|cx| { + if self.handle.inner().is_none() { + return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone"))); + } + + Pin::new(&mut fut).poll(cx).map(Ok) + }).await + } + + pub(crate) async fn async_io<R>(&self, interest: Interest, mut f: impl FnMut() -> io::Result<R>) -> io::Result<R> { + loop { + let event = self.readiness(interest).await?; + + match f() { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.clear_readiness(event); + } + x => return x, + } + } + } + } +} diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index 3aefb376..ed3adc39 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -49,6 +49,8 @@ struct Waiters { } cfg_io_readiness! { + use crate::io::Interest; + #[derive(Debug)] struct Waiter { pointers: linked_list::Pointers<Waiter>, @@ -57,7 +59,7 @@ cfg_io_readiness! { waker: Option<Waker>, /// The interest this waiter is waiting on - interest: mio::Interest, + interest: Interest, is_ready: bool, @@ -283,7 +285,7 @@ impl ScheduledIo { /// These are to support `AsyncRead` and `AsyncWrite` polling methods, /// which cannot use the `async fn` version. This uses reserved reader /// and writer slots. - pub(in crate::io) fn poll_readiness( + pub(super) fn poll_readiness( &self, cx: &mut Context<'_>, direction: Direction, @@ -299,7 +301,19 @@ impl ScheduledIo { Direction::Read => &mut waiters.reader, Direction::Write => &mut waiters.writer, }; - *slot = Some(cx.waker().clone()); + + // Avoid cloning the waker if one is already stored that matches the + // current task. + match slot { + Some(existing) => { + if !existing.will_wake(cx.waker()) { + *existing = cx.waker().clone(); + } + } + None => { + *slot = Some(cx.waker().clone()); + } + } // Try again, in case the readiness was changed while we were // taking the waiters lock @@ -348,7 +362,7 @@ unsafe impl Sync for ScheduledIo {} cfg_io_readiness! { impl ScheduledIo { /// An async version of `poll_readiness` which uses a linked list of wakers - pub(crate) async fn readiness(&self, interest: mio::Interest) -> ReadyEvent { + pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent { self.readiness_fut(interest).await } @@ -356,7 +370,7 @@ cfg_io_readiness! { // we are borrowing the `UnsafeCell` possibly over await boundaries. // // Go figure. - fn readiness_fut(&self, interest: mio::Interest) -> Readiness<'_> { + fn readiness_fut(&self, interest: Interest) -> Readiness<'_> { Readiness { scheduled_io: self, state: State::Init, |