From ce891a4df17e632f7557dd0cd1f1e8da89bd6ae4 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 11 Nov 2020 09:28:21 -0800 Subject: io: driver internal cleanup (#3124) * Removes duplicated code by moving it to `Registration`. * impl `Deref` for `PollEvented` to avoid `get_ref()`. * Avoid extra waker clones in I/O driver. * Add `Interest` wrapper around `mio::Interest`. --- tokio/src/io/async_fd.rs | 15 ++- tokio/src/io/driver/interest.rs | 58 +++++++++ tokio/src/io/driver/mod.rs | 14 ++- tokio/src/io/driver/ready.rs | 8 +- tokio/src/io/driver/registration.rs | 222 +++++++++++++++++++++++++++++++++ tokio/src/io/driver/scheduled_io.rs | 24 +++- tokio/src/io/mod.rs | 3 +- tokio/src/io/poll_evented.rs | 228 ++++++++-------------------------- tokio/src/io/registration.rs | 155 ----------------------- tokio/src/macros/cfg.rs | 13 ++ tokio/src/net/tcp/listener.rs | 23 ++-- tokio/src/net/tcp/stream.rs | 73 ++++------- tokio/src/net/udp/socket.rs | 171 ++++++++++--------------- tokio/src/net/unix/datagram/socket.rs | 38 +++--- tokio/src/net/unix/listener.rs | 32 ++--- tokio/src/net/unix/stream.rs | 52 ++------ tokio/src/process/mod.rs | 34 ++--- tokio/src/process/unix/mod.rs | 58 ++++----- tokio/src/signal/unix/driver.rs | 10 +- 19 files changed, 582 insertions(+), 649 deletions(-) create mode 100644 tokio/src/io/driver/interest.rs create mode 100644 tokio/src/io/driver/registration.rs delete mode 100644 tokio/src/io/registration.rs (limited to 'tokio') diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs index 617f42e0..99f23fd6 100644 --- a/tokio/src/io/async_fd.rs +++ b/tokio/src/io/async_fd.rs @@ -1,5 +1,4 @@ -use crate::io::driver::{Direction, Handle, ReadyEvent}; -use crate::io::registration::Registration; +use crate::io::driver::{Handle, Interest, ReadyEvent, Registration}; use mio::unix::SourceFd; use std::io; @@ -74,7 +73,7 @@ pub struct AsyncFdReadyGuard<'a, T: AsRawFd> { event: Option, } -const ALL_INTEREST: mio::Interest = mio::Interest::READABLE.add(mio::Interest::WRITABLE); +const ALL_INTEREST: Interest = Interest::READABLE.add(Interest::WRITABLE); impl AsyncFd { /// Creates an AsyncFd backed by (and taking ownership of) an object @@ -145,7 +144,7 @@ impl AsyncFd { &'a self, cx: &mut Context<'_>, ) -> Poll>> { - let event = ready!(self.registration.poll_readiness(cx, Direction::Read))?; + let event = ready!(self.registration.poll_read_ready(cx))?; Ok(AsyncFdReadyGuard { async_fd: self, @@ -170,7 +169,7 @@ impl AsyncFd { &'a self, cx: &mut Context<'_>, ) -> Poll>> { - let event = ready!(self.registration.poll_readiness(cx, Direction::Write))?; + let event = ready!(self.registration.poll_write_ready(cx))?; Ok(AsyncFdReadyGuard { async_fd: self, @@ -179,7 +178,7 @@ impl AsyncFd { .into() } - async fn readiness(&self, interest: mio::Interest) -> io::Result> { + async fn readiness(&self, interest: Interest) -> io::Result> { let event = self.registration.readiness(interest).await?; Ok(AsyncFdReadyGuard { @@ -193,7 +192,7 @@ impl AsyncFd { /// /// [`AsyncFdReadyGuard`]: struct@self::AsyncFdReadyGuard pub async fn readable(&self) -> io::Result> { - self.readiness(mio::Interest::READABLE).await + self.readiness(Interest::READABLE).await } /// Waits for the file descriptor to become writable, returning a @@ -201,7 +200,7 @@ impl AsyncFd { /// /// [`AsyncFdReadyGuard`]: struct@self::AsyncFdReadyGuard pub async fn writable(&self) -> io::Result> { - self.readiness(mio::Interest::WRITABLE).await + self.readiness(Interest::WRITABLE).await } } diff --git a/tokio/src/io/driver/interest.rs b/tokio/src/io/driver/interest.rs new file mode 100644 index 00000000..f9887e86 --- /dev/null +++ b/tokio/src/io/driver/interest.rs @@ -0,0 +1,58 @@ +use std::fmt; +use std::ops; + +/// Readiness event interest +/// +/// Specifies the readiness events the caller is interested in when awaiting on +/// I/O resource readiness states. +#[derive(Clone, Copy)] +pub(crate) struct Interest(mio::Interest); + +impl Interest { + /// Interest in all readable events + pub(crate) const READABLE: Interest = Interest(mio::Interest::READABLE); + + /// Interest in all writable events + pub(crate) const WRITABLE: Interest = Interest(mio::Interest::WRITABLE); + + /// Returns true if the value includes readable interest. + pub(crate) const fn is_readable(self) -> bool { + self.0.is_readable() + } + + /// Returns true if the value includes writable interest. + pub(crate) const fn is_writable(self) -> bool { + self.0.is_writable() + } + + /// Add together two `Interst` values. + pub(crate) const fn add(self, other: Interest) -> Interest { + Interest(self.0.add(other.0)) + } + + pub(crate) const fn to_mio(self) -> mio::Interest { + self.0 + } +} + +impl ops::BitOr for Interest { + type Output = Self; + + #[inline] + fn bitor(self, other: Self) -> Self { + self.add(other) + } +} + +impl ops::BitOrAssign for Interest { + #[inline] + fn bitor_assign(&mut self, other: Self) { + self.0 = (*self | other).0; + } +} + +impl fmt::Debug for Interest { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(fmt) + } +} diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index a7d4b7b8..c494db41 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -1,10 +1,16 @@ #![cfg_attr(not(feature = "rt"), allow(dead_code))] +mod interest; +pub(crate) use interest::Interest; + mod ready; use ready::Ready; +mod registration; +pub(crate) use registration::Registration; + mod scheduled_io; -pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests +use scheduled_io::ScheduledIo; use crate::park::{Park, Unpark}; use crate::util::slab::{self, Slab}; @@ -68,7 +74,7 @@ pub(super) struct Inner { } #[derive(Debug, Eq, PartialEq, Clone, Copy)] -pub(super) enum Direction { +enum Direction { Read, Write, } @@ -313,7 +319,7 @@ impl Inner { pub(super) fn add_source( &self, source: &mut impl mio::event::Source, - interest: mio::Interest, + interest: Interest, ) -> io::Result> { let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| { io::Error::new( @@ -325,7 +331,7 @@ impl Inner { let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0)); self.registry - .register(source, mio::Token(token), interest)?; + .register(source, mio::Token(token), interest.to_mio())?; Ok(shared) } diff --git a/tokio/src/io/driver/ready.rs b/tokio/src/io/driver/ready.rs index 8b556e94..2790cc13 100644 --- a/tokio/src/io/driver/ready.rs +++ b/tokio/src/io/driver/ready.rs @@ -114,8 +114,10 @@ impl Ready { } cfg_io_readiness! { + use crate::io::Interest; + impl Ready { - pub(crate) fn from_interest(interest: mio::Interest) -> Ready { + pub(crate) fn from_interest(interest: Interest) -> Ready { let mut ready = Ready::EMPTY; if interest.is_readable() { @@ -131,11 +133,11 @@ cfg_io_readiness! { ready } - pub(crate) fn intersection(self, interest: mio::Interest) -> Ready { + pub(crate) fn intersection(self, interest: Interest) -> Ready { Ready(self.0 & Ready::from_interest(interest).0) } - pub(crate) fn satisfies(self, interest: mio::Interest) -> bool { + pub(crate) fn satisfies(self, interest: Interest) -> bool { self.0 & Ready::from_interest(interest).0 != 0 } } diff --git a/tokio/src/io/driver/registration.rs b/tokio/src/io/driver/registration.rs new file mode 100644 index 00000000..db9afdd7 --- /dev/null +++ b/tokio/src/io/driver/registration.rs @@ -0,0 +1,222 @@ +use crate::io::driver::{Direction, Handle, Interest, ReadyEvent, ScheduledIo}; +use crate::util::slab; + +use mio::event::Source; +use std::io; +use std::task::{Context, Poll}; + +cfg_io_driver! { + /// Associates an I/O resource with the reactor instance that drives it. + /// + /// A registration represents an I/O resource registered with a Reactor such + /// that it will receive task notifications on readiness. This is the lowest + /// level API for integrating with a reactor. + /// + /// The association between an I/O resource is made by calling [`new`]. Once + /// the association is established, it remains established until the + /// registration instance is dropped. + /// + /// A registration instance represents two separate readiness streams. One + /// for the read readiness and one for write readiness. These streams are + /// independent and can be consumed from separate tasks. + /// + /// **Note**: while `Registration` is `Sync`, the caller must ensure that + /// there are at most two tasks that use a registration instance + /// concurrently. One task for [`poll_read_ready`] and one task for + /// [`poll_write_ready`]. While violating this requirement is "safe" from a + /// Rust memory safety point of view, it will result in unexpected behavior + /// in the form of lost notifications and tasks hanging. + /// + /// ## Platform-specific events + /// + /// `Registration` also allows receiving platform-specific `mio::Ready` + /// events. These events are included as part of the read readiness event + /// stream. The write readiness event stream is only for `Ready::writable()` + /// events. + /// + /// [`new`]: method@Self::new + /// [`poll_read_ready`]: method@Self::poll_read_ready` + /// [`poll_write_ready`]: method@Self::poll_write_ready` + #[derive(Debug)] + pub(crate) struct Registration { + /// Handle to the associated driver. + handle: Handle, + + /// Reference to state stored by the driver. + shared: slab::Ref, + } +} + +unsafe impl Send for Registration {} +unsafe impl Sync for Registration {} + +// ===== impl Registration ===== + +impl Registration { + /// Registers the I/O resource with the default reactor, for a specific + /// `Interest`. `new_with_interest` should be used over `new` when you need + /// control over the readiness state, such as when a file descriptor only + /// allows reads. This does not add `hup` or `error` so if you are + /// interested in those states, you will need to add them to the readiness + /// state passed to this function. + /// + /// # Return + /// + /// - `Ok` if the registration happened successfully + /// - `Err` if an error was encountered during registration + pub(crate) fn new_with_interest_and_handle( + io: &mut impl Source, + interest: Interest, + handle: Handle, + ) -> io::Result { + let shared = if let Some(inner) = handle.inner() { + inner.add_source(io, interest)? + } else { + return Err(io::Error::new( + io::ErrorKind::Other, + "failed to find event loop", + )); + }; + + Ok(Registration { handle, shared }) + } + + /// Deregisters the I/O resource from the reactor it is associated with. + /// + /// This function must be called before the I/O resource associated with the + /// registration is dropped. + /// + /// Note that deregistering does not guarantee that the I/O resource can be + /// registered with a different reactor. Some I/O resource types can only be + /// associated with a single reactor instance for their lifetime. + /// + /// # Return + /// + /// If the deregistration was successful, `Ok` is returned. Any calls to + /// `Reactor::turn` that happen after a successful call to `deregister` will + /// no longer result in notifications getting sent for this registration. + /// + /// `Err` is returned if an error is encountered. + pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> { + let inner = match self.handle.inner() { + Some(inner) => inner, + None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), + }; + inner.deregister_source(io) + } + + pub(crate) fn clear_readiness(&self, event: ReadyEvent) { + self.shared.clear_readiness(event); + } + + // Uses the poll path, requiring the caller to ensure mutual exclusion for + // correctness. Only the last task to call this function is notified. + pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.poll_ready(cx, Direction::Read) + } + + // Uses the poll path, requiring the caller to ensure mutual exclusion for + // correctness. Only the last task to call this function is notified. + pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.poll_ready(cx, Direction::Write) + } + + // Uses the poll path, requiring the caller to ensure mutual exclusion for + // correctness. Only the last task to call this function is notified. + pub(crate) fn poll_read_io( + &self, + cx: &mut Context<'_>, + f: impl FnMut() -> io::Result, + ) -> Poll> { + self.poll_io(cx, Direction::Read, f) + } + + // Uses the poll path, requiring the caller to ensure mutual exclusion for + // correctness. Only the last task to call this function is notified. + pub(crate) fn poll_write_io( + &self, + cx: &mut Context<'_>, + f: impl FnMut() -> io::Result, + ) -> Poll> { + self.poll_io(cx, Direction::Write, f) + } + + /// Polls for events on the I/O resource's `direction` readiness stream. + /// + /// If called with a task context, notify the task when a new event is + /// received. + fn poll_ready( + &self, + cx: &mut Context<'_>, + direction: Direction, + ) -> Poll> { + // Keep track of task budget + let coop = ready!(crate::coop::poll_proceed(cx)); + let ev = ready!(self.shared.poll_readiness(cx, direction)); + + if self.handle.inner().is_none() { + return Poll::Ready(Err(gone())); + } + + coop.made_progress(); + Poll::Ready(Ok(ev)) + } + + fn poll_io( + &self, + cx: &mut Context<'_>, + direction: Direction, + mut f: impl FnMut() -> io::Result, + ) -> Poll> { + loop { + let ev = ready!(self.poll_ready(cx, direction))?; + + match f() { + Ok(ret) => { + return Poll::Ready(Ok(ret)); + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.clear_readiness(ev); + } + Err(e) => return Poll::Ready(Err(e)), + } + } + } +} + +fn gone() -> io::Error { + io::Error::new(io::ErrorKind::Other, "IO driver has terminated") +} + +cfg_io_readiness! { + impl Registration { + pub(crate) async fn readiness(&self, interest: Interest) -> io::Result { + use std::future::Future; + use std::pin::Pin; + + let fut = self.shared.readiness(interest); + pin!(fut); + + crate::future::poll_fn(|cx| { + if self.handle.inner().is_none() { + return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone"))); + } + + Pin::new(&mut fut).poll(cx).map(Ok) + }).await + } + + pub(crate) async fn async_io(&self, interest: Interest, mut f: impl FnMut() -> io::Result) -> io::Result { + loop { + let event = self.readiness(interest).await?; + + match f() { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.clear_readiness(event); + } + x => return x, + } + } + } + } +} diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index 3aefb376..ed3adc39 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -49,6 +49,8 @@ struct Waiters { } cfg_io_readiness! { + use crate::io::Interest; + #[derive(Debug)] struct Waiter { pointers: linked_list::Pointers, @@ -57,7 +59,7 @@ cfg_io_readiness! { waker: Option, /// The interest this waiter is waiting on - interest: mio::Interest, + interest: Interest, is_ready: bool, @@ -283,7 +285,7 @@ impl ScheduledIo { /// These are to support `AsyncRead` and `AsyncWrite` polling methods, /// which cannot use the `async fn` version. This uses reserved reader /// and writer slots. - pub(in crate::io) fn poll_readiness( + pub(super) fn poll_readiness( &self, cx: &mut Context<'_>, direction: Direction, @@ -299,7 +301,19 @@ impl ScheduledIo { Direction::Read => &mut waiters.reader, Direction::Write => &mut waiters.writer, }; - *slot = Some(cx.waker().clone()); + + // Avoid cloning the waker if one is already stored that matches the + // current task. + match slot { + Some(existing) => { + if !existing.will_wake(cx.waker()) { + *existing = cx.waker().clone(); + } + } + None => { + *slot = Some(cx.waker().clone()); + } + } // Try again, in case the readiness was changed while we were // taking the waiters lock @@ -348,7 +362,7 @@ unsafe impl Sync for ScheduledIo {} cfg_io_readiness! { impl ScheduledIo { /// An async version of `poll_readiness` which uses a linked list of wakers - pub(crate) async fn readiness(&self, interest: mio::Interest) -> ReadyEvent { + pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent { self.readiness_fut(interest).await } @@ -356,7 +370,7 @@ cfg_io_readiness! { // we are borrowing the `UnsafeCell` possibly over await boundaries. // // Go figure. - fn readiness_fut(&self, interest: mio::Interest) -> Readiness<'_> { + fn readiness_fut(&self, interest: Interest) -> Readiness<'_> { Readiness { scheduled_io: self, state: State::Init, diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 20d92233..499633ee 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -206,8 +206,7 @@ pub use std::io::{Error, ErrorKind, Result, SeekFrom}; cfg_io_driver! { pub(crate) mod driver; - - mod registration; + pub(crate) use driver::Interest; mod poll_evented; diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index 66a26346..803932ba 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -1,13 +1,9 @@ -use crate::io::driver::{Direction, Handle, ReadyEvent}; -use crate::io::registration::Registration; -use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; +use crate::io::driver::{Handle, Interest, Registration}; use mio::event::Source; use std::fmt; -use std::io::{self, Read, Write}; -use std::marker::Unpin; -use std::pin::Pin; -use std::task::{Context, Poll}; +use std::io; +use std::ops::Deref; cfg_io_driver! { /// Associates an I/O resource that implements the [`std::io::Read`] and/or @@ -89,30 +85,32 @@ impl PollEvented { /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. #[cfg_attr(feature = "signal", allow(unused))] pub(crate) fn new(io: E) -> io::Result { - PollEvented::new_with_interest(io, mio::Interest::READABLE | mio::Interest::WRITABLE) + PollEvented::new_with_interest(io, Interest::READABLE | Interest::WRITABLE) } - /// Creates a new `PollEvented` associated with the default reactor, for specific `mio::Interest` - /// state. `new_with_interest` should be used over `new` when you need control over the readiness - /// state, such as when a file descriptor only allows reads. This does not add `hup` or `error` - /// so if you are interested in those states, you will need to add them to the readiness state - /// passed to this function. + /// Creates a new `PollEvented` associated with the default reactor, for + /// specific `Interest` state. `new_with_interest` should be used over `new` + /// when you need control over the readiness state, such as when a file + /// descriptor only allows reads. This does not add `hup` or `error` so if + /// you are interested in those states, you will need to add them to the + /// readiness state passed to this function. /// /// # Panics /// /// This function panics if thread-local runtime is not set. /// - /// The runtime is usually set implicitly when this function is called - /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + /// The runtime is usually set implicitly when this function is called from + /// a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) + /// function. #[cfg_attr(feature = "signal", allow(unused))] - pub(crate) fn new_with_interest(io: E, interest: mio::Interest) -> io::Result { + pub(crate) fn new_with_interest(io: E, interest: Interest) -> io::Result { Self::new_with_interest_and_handle(io, interest, Handle::current()) } pub(crate) fn new_with_interest_and_handle( mut io: E, - interest: mio::Interest, + interest: Interest, handle: Handle, ) -> io::Result { let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?; @@ -122,177 +120,57 @@ impl PollEvented { }) } - /// Returns a shared reference to the underlying I/O object this readiness - /// stream is wrapping. - #[cfg(any(feature = "net", feature = "process", feature = "signal"))] - pub(crate) fn get_ref(&self) -> &E { - self.io.as_ref().unwrap() - } - - /// Returns a mutable reference to the underlying I/O object this readiness - /// stream is wrapping. - pub(crate) fn get_mut(&mut self) -> &mut E { - self.io.as_mut().unwrap() - } - - pub(crate) fn clear_readiness(&self, event: ReadyEvent) { - self.registration.clear_readiness(event); + /// Returns a reference to the registration + pub(crate) fn registration(&self) -> &Registration { + &self.registration } +} - /// Checks the I/O resource's read readiness state. - /// - /// The mask argument allows specifying what readiness to notify on. This - /// can be any value, including platform specific readiness, **except** - /// `writable`. HUP is always implicitly included on platforms that support - /// it. - /// - /// If the resource is not ready for a read then `Poll::Pending` is returned - /// and the current task is notified once a new event is received. - /// - /// The I/O resource will remain in a read-ready state until readiness is - /// cleared by calling [`clear_read_ready`]. - /// - /// [`clear_read_ready`]: method@Self::clear_read_ready - /// - /// # Panics - /// - /// This function panics if: - /// - /// * `ready` includes writable. - /// * called from outside of a task context. - /// - /// # Warning - /// - /// This method may not be called concurrently. It takes `&self` to allow - /// calling it concurrently with `poll_write_ready`. - pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> { - self.registration.poll_readiness(cx, Direction::Read) - } +feature! { + #![any(feature = "net", feature = "process")] - /// Checks the I/O resource's write readiness state. - /// - /// This always checks for writable readiness and also checks for HUP - /// readiness on platforms that support it. - /// - /// If the resource is not ready for a write then `Poll::Pending` is - /// returned and the current task is notified once a new event is received. - /// - /// The I/O resource will remain in a write-ready state until readiness is - /// cleared by calling [`clear_write_ready`]. - /// - /// [`clear_write_ready`]: method@Self::clear_write_ready - /// - /// # Panics - /// - /// This function panics if: - /// - /// * `ready` contains bits besides `writable` and `hup`. - /// * called from outside of a task context. - /// - /// # Warning - /// - /// This method may not be called concurrently. It takes `&self` to allow - /// calling it concurrently with `poll_read_ready`. - pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { - self.registration.poll_readiness(cx, Direction::Write) - } -} + use crate::io::ReadBuf; + use std::task::{Context, Poll}; -cfg_io_readiness! { impl PollEvented { - pub(crate) async fn readiness(&self, interest: mio::Interest) -> io::Result { - self.registration.readiness(interest).await - } - - pub(crate) async fn async_io(&self, interest: mio::Interest, mut op: F) -> io::Result + // Safety: The caller must ensure that `E` can read into uninitialized memory + pub(crate) unsafe fn poll_read<'a>( + &'a self, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> where - F: FnMut(&E) -> io::Result, + &'a E: io::Read + 'a, { - loop { - let event = self.readiness(interest).await?; - - match op(self.get_ref()) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.clear_readiness(event); - } - x => return x, - } - } + use std::io::Read; + + let n = ready!(self.registration.poll_read_io(cx, || { + let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]); + self.io.as_ref().unwrap().read(b) + }))?; + + // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the + // buffer. + buf.assume_init(n); + buf.advance(n); + Poll::Ready(Ok(())) } - } -} - -// ===== Read / Write impls ===== - -impl AsyncRead for PollEvented { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - loop { - let ev = ready!(self.poll_read_ready(cx))?; - - // We can't assume the `Read` won't look at the read buffer, - // so we have to force initialization here. - let r = (*self).get_mut().read(buf.initialize_unfilled()); - if is_wouldblock(&r) { - self.clear_readiness(ev); - continue; - } - - return Poll::Ready(r.map(|n| { - buf.advance(n); - })); + pub(crate) fn poll_write<'a>(&'a self, cx: &mut Context<'_>, buf: &[u8]) -> Poll> + where + &'a E: io::Write + 'a, + { + use std::io::Write; + self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write(buf)) } } } -impl AsyncWrite for PollEvented { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - loop { - let ev = ready!(self.poll_write_ready(cx))?; +impl Deref for PollEvented { + type Target = E; - let r = (*self).get_mut().write(buf); - - if is_wouldblock(&r) { - self.clear_readiness(ev); - continue; - } - - return Poll::Ready(r); - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - loop { - let ev = ready!(self.poll_write_ready(cx))?; - - let r = (*self).get_mut().flush(); - - if is_wouldblock(&r) { - self.clear_readiness(ev); - continue; - } - - return Poll::Ready(r); - } - } - - fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } -} - -fn is_wouldblock(r: &io::Result) -> bool { - match *r { - Ok(_) => false, - Err(ref e) => e.kind() == io::ErrorKind::WouldBlock, + fn deref(&self) -> &E { + self.io.as_ref().unwrap() } } diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs deleted file mode 100644 index 0b166490..00000000 --- a/tokio/src/io/registration.rs +++ /dev/null @@ -1,155 +0,0 @@ -use crate::io::driver::{Direction, Handle, ReadyEvent, ScheduledIo}; -use crate::util::slab; - -use mio::event::Source; -use std::io; -use std::task::{Context, Poll}; - -cfg_io_driver! { - /// Associates an I/O resource with the reactor instance that drives it. - /// - /// A registration represents an I/O resource registered with a Reactor such - /// that it will receive task notifications on readiness. This is the lowest - /// level API for integrating with a reactor. - /// - /// The association between an I/O resource is made by calling [`new`]. Once - /// the association is established, it remains established until the - /// registration instance is dropped. - /// - /// A registration instance represents two separate readiness streams. One - /// for the read readiness and one for write readiness. These streams are - /// independent and can be consumed from separate tasks. - /// - /// **Note**: while `Registration` is `Sync`, the caller must ensure that - /// there are at most two tasks that use a registration instance - /// concurrently. One task for [`poll_read_ready`] and one task for - /// [`poll_write_ready`]. While violating this requirement is "safe" from a - /// Rust memory safety point of view, it will result in unexpected behavior - /// in the form of lost notifications and tasks hanging. - /// - /// ## Platform-specific events - /// - /// `Registration` also allows receiving platform-specific `mio::Ready` - /// events. These events are included as part of the read readiness event - /// stream. The write readiness event stream is only for `Ready::writable()` - /// events. - /// - /// [`new`]: method@Self::new - /// [`poll_read_ready`]: method@Self::poll_read_ready` - /// [`poll_write_ready`]: method@Self::poll_write_ready` - #[derive(Debug)] - pub(crate) struct Registration { - /// Handle to the associated driver. - handle: Handle, - - /// Reference to state stored by the driver. - shared: slab::Ref, - } -} - -unsafe impl Send for Registration {} -unsafe impl Sync for Registration {} - -// ===== impl Registration ===== - -impl Registration { - /// Registers the I/O resource with the default reactor, for a specific `mio::Interest`. - /// `new_with_interest` should be used over `new` when you need control over the readiness state, - /// such as when a file descriptor only allows reads. This does not add `hup` or `error` so if - /// you are interested in those states, you will need to add them to the readiness state passed - /// to this function. - /// - /// # Return - /// - /// - `Ok` if the registration happened successfully - /// - `Err` if an error was encountered during registration - pub(crate) fn new_with_interest_and_handle( - io: &mut impl Source, - interest: mio::Interest, - handle: Handle, - ) -> io::Result { - let shared = if let Some(inner) = handle.inner() { - inner.add_source(io, interest)? - } else { - return Err(io::Error::new( - io::ErrorKind::Other, - "failed to find event loop", - )); - }; - - Ok(Registration { handle, shared }) - } - - /// Deregisters the I/O resource from the reactor it is associated with. - /// - /// This function must be called before the I/O resource associated with the - /// registration is dropped. - /// - /// Note that deregistering does not guarantee that the I/O resource can be - /// registered with a different reactor. Some I/O resource types can only be - /// associated with a single reactor instance for their lifetime. - /// - /// # Return - /// - /// If the deregistration was successful, `Ok` is returned. Any calls to - /// `Reactor::turn` that happen after a successful call to `deregister` will - /// no longer result in notifications getting sent for this registration. - /// - /// `Err` is returned if an error is encountered. - pub(super) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> { - let inner = match self.handle.inner() { - Some(inner) => inner, - None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), - }; - inner.deregister_source(io) - } - - pub(super) fn clear_readiness(&self, event: ReadyEvent) { - self.shared.clear_readiness(event); - } - - /// Polls for events on the I/O resource's `direction` readiness stream. - /// - /// If called with a task context, notify the task when a new event is - /// received. - pub(super) fn poll_readiness( - &self, - cx: &mut Context<'_>, - direction: Direction, - ) -> Poll> { - // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); - let ev = ready!(self.shared.poll_readiness(cx, direction)); - - if self.handle.inner().is_none() { - return Poll::Ready(Err(gone())); - } - - coop.made_progress(); - Poll::Ready(Ok(ev)) - } -} - -fn gone() -> io::Error { - io::Error::new(io::ErrorKind::Other, "IO driver has terminated") -} - -cfg_io_readiness! { - impl Registration { - pub(super) async fn readiness(&self, interest: mio::Interest) -> io::Result { - use std::future::Future; - use std::pin::Pin; - - let fut = self.shared.readiness(interest); - pin!(fut); - - crate::future::poll_fn(|cx| { - if self.handle.inner().is_none() { - return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone"))); - } - - Pin::new(&mut fut).poll(cx).map(Ok) - }).await - } - } -} diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 27929119..edf681a4 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -1,5 +1,18 @@ #![allow(unused_macros)] +macro_rules! feature { + ( + #![$meta:meta] + $($item:item)* + ) => { + $( + #[cfg($meta)] + #[cfg_attr(docsrs, doc(cfg($meta)))] + $item + )* + } +} + /// Enables enter::block_on macro_rules! cfg_block_on { ($($item:item)*) => { diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs index be528f2b..8b0a4803 100644 --- a/tokio/src/net/tcp/listener.rs +++ b/tokio/src/net/tcp/listener.rs @@ -1,4 +1,4 @@ -use crate::io::PollEvented; +use crate::io::{Interest, PollEvented}; use crate::net::tcp::TcpStream; use crate::net::{to_socket_addrs, ToSocketAddrs}; @@ -164,7 +164,8 @@ impl TcpListener { pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { let (mio, addr) = self .io - .async_io(mio::Interest::READABLE, |sock| sock.accept()) + .registration() + .async_io(Interest::READABLE, || self.io.accept()) .await?; let stream = TcpStream::new(mio)?; @@ -181,15 +182,15 @@ impl TcpListener { /// single task. Failing to do this could result in tasks hanging. pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll> { loop { - let ev = ready!(self.io.poll_read_ready(cx))?; + let ev = ready!(self.io.registration().poll_read_ready(cx))?; - match self.io.get_ref().accept() { + match self.io.accept() { Ok((io, addr)) => { let io = TcpStream::new(io)?; return Poll::Ready(Ok((io, addr))); } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); + self.io.registration().clear_readiness(ev); } Err(e) => return Poll::Ready(Err(e)), } @@ -266,7 +267,7 @@ impl TcpListener { /// } /// ``` pub fn local_addr(&self) -> io::Result { - self.io.get_ref().local_addr() + self.io.local_addr() } /// Gets the value of the `IP_TTL` option for this socket. @@ -293,7 +294,7 @@ impl TcpListener { /// } /// ``` pub fn ttl(&self) -> io::Result { - self.io.get_ref().ttl() + self.io.ttl() } /// Sets the value for the `IP_TTL` option on this socket. @@ -318,7 +319,7 @@ impl TcpListener { /// } /// ``` pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { - self.io.get_ref().set_ttl(ttl) + self.io.set_ttl(ttl) } } @@ -346,7 +347,7 @@ impl TryFrom for TcpListener { impl fmt::Debug for TcpListener { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.io.get_ref().fmt(f) + self.io.fmt(f) } } @@ -357,7 +358,7 @@ mod sys { impl AsRawFd for TcpListener { fn as_raw_fd(&self) -> RawFd { - self.io.get_ref().as_raw_fd() + self.io.as_raw_fd() } } } @@ -369,7 +370,7 @@ mod sys { impl AsRawSocket for TcpListener { fn as_raw_socket(&self) -> RawSocket { - self.io.get_ref().as_raw_socket() + self.io.as_raw_socket() } } } diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 045cb6c3..0a784b5f 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -1,12 +1,12 @@ use crate::future::poll_fn; -use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf}; +use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf}; use crate::net::tcp::split::{split, ReadHalf, WriteHalf}; use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf}; use crate::net::{to_socket_addrs, ToSocketAddrs}; use std::convert::TryFrom; use std::fmt; -use std::io::{self, Read, Write}; +use std::io; use std::net::{Shutdown, SocketAddr}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -129,9 +129,9 @@ impl TcpStream { // actually hit an error or not. // // If all that succeeded then we ship everything on up. - poll_fn(|cx| stream.io.poll_write_ready(cx)).await?; + poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?; - if let Some(e) = stream.io.get_ref().take_error()? { + if let Some(e) = stream.io.take_error()? { return Err(e); } @@ -193,7 +193,7 @@ impl TcpStream { /// # } /// ``` pub fn local_addr(&self) -> io::Result { - self.io.get_ref().local_addr() + self.io.local_addr() } /// Returns the remote address that this stream is connected to. @@ -211,7 +211,7 @@ impl TcpStream { /// # } /// ``` pub fn peer_addr(&self) -> io::Result { - self.io.get_ref().peer_addr() + self.io.peer_addr() } /// Attempts to receive data on the socket, without removing that data from @@ -252,12 +252,12 @@ impl TcpStream { /// ``` pub fn poll_peek(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { loop { - let ev = ready!(self.io.poll_read_ready(cx))?; + let ev = ready!(self.io.registration().poll_read_ready(cx))?; - match self.io.get_ref().peek(buf) { + match self.io.peek(buf) { Ok(ret) => return Poll::Ready(Ok(ret)), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); + self.io.registration().clear_readiness(ev); } Err(e) => return Poll::Ready(Err(e)), } @@ -303,7 +303,8 @@ impl TcpStream { /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt pub async fn peek(&self, buf: &mut [u8]) -> io::Result { self.io - .async_io(mio::Interest::READABLE, |io| io.peek(buf)) + .registration() + .async_io(Interest::READABLE, || self.io.peek(buf)) .await } @@ -332,7 +333,7 @@ impl TcpStream { /// } /// ``` pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { - self.io.get_ref().shutdown(how) + self.io.shutdown(how) } /// Gets the value of the `TCP_NODELAY` option on this socket. @@ -354,7 +355,7 @@ impl TcpStream { /// # } /// ``` pub fn nodelay(&self) -> io::Result { - self.io.get_ref().nodelay() + self.io.nodelay() } /// Sets the value of the `TCP_NODELAY` option on this socket. @@ -378,7 +379,7 @@ impl TcpStream { /// # } /// ``` pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { - self.io.get_ref().set_nodelay(nodelay) + self.io.set_nodelay(nodelay) } /// Gets the value of the `IP_TTL` option for this socket. @@ -400,7 +401,7 @@ impl TcpStream { /// # } /// ``` pub fn ttl(&self) -> io::Result { - self.io.get_ref().ttl() + self.io.ttl() } /// Sets the value for the `IP_TTL` option on this socket. @@ -421,7 +422,7 @@ impl TcpStream { /// # } /// ``` pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { - self.io.get_ref().set_ttl(ttl) + self.io.set_ttl(ttl) } // These lifetime markers also appear in the generated documentation, and make @@ -469,29 +470,8 @@ impl TcpStream { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - loop { - let ev = ready!(self.io.poll_read_ready(cx))?; - - // Safety: `TcpStream::read` will not peek at the maybe uinitialized bytes. - let b = unsafe { - &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]) - }; - match self.io.get_ref().read(b) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); - } - Ok(n) => { - // Safety: We trust `TcpStream::read` to have filled up `n` bytes - // in the buffer. - unsafe { - buf.assume_init(n); - } - buf.advance(n); - return Poll::Ready(Ok(())); - } - Err(e) => return Poll::Ready(Err(e)), - } - } + // Safety: `TcpStream::read` correctly handles reads into uninitialized memory + unsafe { self.io.poll_read(cx, buf) } } pub(super) fn poll_write_priv( @@ -499,16 +479,7 @@ impl TcpStream { cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - loop { - let ev = ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().write(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); - } - x => return Poll::Ready(x), - } - } + self.io.poll_write(cx, buf) } } @@ -559,7 +530,7 @@ impl AsyncWrite for TcpStream { impl fmt::Debug for TcpStream { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.io.get_ref().fmt(f) + self.io.fmt(f) } } @@ -570,7 +541,7 @@ mod sys { impl AsRawFd for TcpStream { fn as_raw_fd(&self) -> RawFd { - self.io.get_ref().as_raw_fd() + self.io.as_raw_fd() } } } @@ -582,7 +553,7 @@ mod sys { impl AsRawSocket for TcpStream { fn as_raw_socket(&self) -> RawSocket { - self.io.get_ref().as_raw_socket() + self.io.as_raw_socket() } } } diff --git a/tokio/src/net/udp/socket.rs b/tokio/src/net/udp/socket.rs index c68e37f8..f8b6a787 100644 --- a/tokio/src/net/udp/socket.rs +++ b/tokio/src/net/udp/socket.rs @@ -1,4 +1,4 @@ -use crate::io::{PollEvented, ReadBuf}; +use crate::io::{Interest, PollEvented, ReadBuf}; use crate::net::{to_socket_addrs, ToSocketAddrs}; use std::convert::TryFrom; @@ -216,7 +216,7 @@ impl UdpSocket { /// # } /// ``` pub fn local_addr(&self) -> io::Result { - self.io.get_ref().local_addr() + self.io.local_addr() } /// Connects the UDP socket setting the default destination for send() and @@ -248,7 +248,7 @@ impl UdpSocket { let mut last_err = None; for addr in addrs { - match self.io.get_ref().connect(addr) { + match self.io.connect(addr) { Ok(_) => return Ok(()), Err(e) => last_err = Some(e), } @@ -271,7 +271,8 @@ impl UdpSocket { /// [`connect`]: method@Self::connect pub async fn send(&self, buf: &[u8]) -> io::Result { self.io - .async_io(mio::Interest::WRITABLE, |sock| sock.send(buf)) + .registration() + .async_io(Interest::WRITABLE, || self.io.send(buf)) .await } @@ -299,16 +300,9 @@ impl UdpSocket { /// /// [`connect`]: method@Self::connect pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - loop { - let ev = ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); - } - x => return Poll::Ready(x), - } - } + self.io + .registration() + .poll_write_io(cx, || self.io.send(buf)) } /// Try to send data on the socket to the remote address to which it is @@ -322,7 +316,7 @@ impl UdpSocket { /// /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock pub fn try_send(&self, buf: &[u8]) -> io::Result { - self.io.get_ref().send(buf) + self.io.send(buf) } /// Returns a future that receives a single datagram message on the socket from @@ -339,7 +333,8 @@ impl UdpSocket { /// [`connect`]: method@Self::connect pub async fn recv(&self, buf: &mut [u8]) -> io::Result { self.io - .async_io(mio::Interest::READABLE, |sock| sock.recv(buf)) + .registration() + .async_io(Interest::READABLE, || self.io.recv(buf)) .await } @@ -367,29 +362,21 @@ impl UdpSocket { /// /// [`connect`]: method@Self::connect pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { - loop { - let ev = ready!(self.io.poll_read_ready(cx))?; - + let n = ready!(self.io.registration().poll_read_io(cx, || { // Safety: will not read the maybe uinitialized bytes. let b = unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]) }; - match self.io.get_ref().recv(b) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); - } - Err(e) => return Poll::Ready(Err(e)), - Ok(n) => { - // Safety: We trust `recv` to have filled up `n` bytes - // in the buffer. - unsafe { - buf.assume_init(n); - } - buf.advance(n); - return Poll::Ready(Ok(())); - } - } + + self.io.recv(b) + }))?; + + // Safety: We trust `recv` to have filled up `n` bytes in the buffer. + unsafe { + buf.assume_init(n); } + buf.advance(n); + Poll::Ready(Ok(())) } /// Returns a future that sends data on the socket to the given address. @@ -448,16 +435,9 @@ impl UdpSocket { buf: &[u8], target: &SocketAddr, ) -> Poll> { - loop { - let ev = ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send_to(buf, *target) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); - } - x => return Poll::Ready(x), - } - } + self.io + .registration() + .poll_write_io(cx, || self.io.send_to(buf, *target)) } /// Try to send data on the socket to the given address, but if the send is blocked @@ -489,12 +469,13 @@ impl UdpSocket { /// /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock pub fn try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result { - self.io.get_ref().send_to(buf, target) + self.io.send_to(buf, target) } async fn send_to_addr(&self, buf: &[u8], target: SocketAddr) -> io::Result { self.io - .async_io(mio::Interest::WRITABLE, |sock| sock.send_to(buf, target)) + .registration() + .async_io(Interest::WRITABLE, || self.io.send_to(buf, target)) .await } @@ -522,7 +503,8 @@ impl UdpSocket { /// ``` pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { self.io - .async_io(mio::Interest::READABLE, |sock| sock.recv_from(buf)) + .registration() + .async_io(Interest::READABLE, || self.io.recv_from(buf)) .await } @@ -548,29 +530,21 @@ impl UdpSocket { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - loop { - let ev = ready!(self.io.poll_read_ready(cx))?; - + let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || { // Safety: will not read the maybe uinitialized bytes. let b = unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]) }; - match self.io.get_ref().recv_from(b) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); - } - Err(e) => return Poll::Ready(Err(e)), - Ok((n, addr)) => { - // Safety: We trust `recv` to have filled up `n` bytes - // in the buffer. - unsafe { - buf.assume_init(n); - } - buf.advance(n); - return Poll::Ready(Ok(addr)); - } - } + + self.io.recv_from(b) + }))?; + + // Safety: We trust `recv` to have filled up `n` bytes in the buffer. + unsafe { + buf.assume_init(n); } + buf.advance(n); + Poll::Ready(Ok(addr)) } /// Receives data from the socket, without removing it from the input queue. @@ -602,7 +576,8 @@ impl UdpSocket { /// ``` pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { self.io - .async_io(mio::Interest::READABLE, |sock| sock.peek_from(buf)) + .registration() + .async_io(Interest::READABLE, || self.io.peek_from(buf)) .await } @@ -637,29 +612,21 @@ impl UdpSocket { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - loop { - let ev = ready!(self.io.poll_read_ready(cx))?; - + let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || { // Safety: will not read the maybe uinitialized bytes. let b = unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]) }; - match self.io.get_ref().peek_from(b) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); - } - Err(e) => return Poll::Ready(Err(e)), - Ok((n, addr)) => { - // Safety: We trust `recv` to have filled up `n` bytes - // in the buffer. - unsafe { - buf.assume_init(n); - } - buf.advance(n); - return Poll::Ready(Ok(addr)); - } - } + + self.io.peek_from(b) + }))?; + + // Safety: We trust `recv` to have filled up `n` bytes in the buffer. + unsafe { + buf.assume_init(n); } + buf.advance(n); + Poll::Ready(Ok(addr)) } /// Gets the value of the `SO_BROADCAST` option for this socket. @@ -668,7 +635,7 @@ impl UdpSocket { /// /// [`set_broadcast`]: method@Self::set_broadcast pub fn broadcast(&self) -> io::Result { - self.io.get_ref().broadcast() + self.io.broadcast() } /// Sets the value of the `SO_BROADCAST` option for this socket. @@ -676,7 +643,7 @@ impl UdpSocket { /// When enabled, this socket is allowed to send packets to a broadcast /// address. pub fn set_broadcast(&self, on: bool) -> io::Result<()> { - self.io.get_ref().set_broadcast(on) + self.io.set_broadcast(on) } /// Gets the value of the `IP_MULTICAST_LOOP` option for this socket. @@ -685,7 +652,7 @@ impl UdpSocket { /// /// [`set_multicast_loop_v4`]: method@Self::set_multicast_loop_v4 pub fn multicast_loop_v4(&self) -> io::Result { - self.io.get_ref().multicast_loop_v4() + self.io.multicast_loop_v4() } /// Sets the value of the `IP_MULTICAST_LOOP` option for this socket. @@ -696,7 +663,7 @@ impl UdpSocket { /// /// This may not have any affect on IPv6 sockets. pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> { - self.io.get_ref().set_multicast_loop_v4(on) + self.io.set_multicast_loop_v4(on) } /// Gets the value of the `IP_MULTICAST_TTL` option for this socket. @@ -705,7 +672,7 @@ impl UdpSocket { /// /// [`set_multicast_ttl_v4`]: method@Self::set_multicast_ttl_v4 pub fn multicast_ttl_v4(&self) -> io::Result { - self.io.get_ref().multicast_ttl_v4() + self.io.multicast_ttl_v4() } /// Sets the value of the `IP_MULTICAST_TTL` option for this socket. @@ -718,7 +685,7 @@ impl UdpSocket { /// /// This may not have any affect on IPv6 sockets. pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> { - self.io.get_ref().set_multicast_ttl_v4(ttl) + self.io.set_multicast_ttl_v4(ttl) } /// Gets the value of the `IPV6_MULTICAST_LOOP` option for this socket. @@ -727,7 +694,7 @@ impl UdpSocket { /// /// [`set_multicast_loop_v6`]: method@Self::set_multicast_loop_v6 pub fn multicast_loop_v6(&self) -> io::Result { - self.io.get_ref().multicast_loop_v6() + self.io.multicast_loop_v6() } /// Sets the value of the `IPV6_MULTICAST_LOOP` option for this socket. @@ -738,7 +705,7 @@ impl UdpSocket { /// /// This may not have any affect on IPv4 sockets. pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> { - self.io.get_ref().set_multicast_loop_v6(on) + self.io.set_multicast_loop_v6(on) } /// Gets the value of the `IP_TTL` option for this socket. @@ -761,7 +728,7 @@ impl UdpSocket { /// # } /// ``` pub fn ttl(&self) -> io::Result { - self.io.get_ref().ttl() + self.io.ttl() } /// Sets the value for the `IP_TTL` option on this socket. @@ -783,7 +750,7 @@ impl UdpSocket { /// # } /// ``` pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { - self.io.get_ref().set_ttl(ttl) + self.io.set_ttl(ttl) } /// Executes an operation of the `IP_ADD_MEMBERSHIP` type. @@ -794,7 +761,7 @@ impl UdpSocket { /// multicast group. If it's equal to `INADDR_ANY` then an appropriate /// interface is chosen by the system. pub fn join_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> { - self.io.get_ref().join_multicast_v4(&multiaddr, &interface) + self.io.join_multicast_v4(&multiaddr, &interface) } /// Executes an operation of the `IPV6_ADD_MEMBERSHIP` type. @@ -803,7 +770,7 @@ impl UdpSocket { /// The address must be a valid multicast address, and `interface` is the /// index of the interface to join/leave (or 0 to indicate any interface). pub fn join_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { - self.io.get_ref().join_multicast_v6(multiaddr, interface) + self.io.join_multicast_v6(multiaddr, interface) } /// Executes an operation of the `IP_DROP_MEMBERSHIP` type. @@ -812,7 +779,7 @@ impl UdpSocket { /// /// [`join_multicast_v4`]: method@Self::join_multicast_v4 pub fn leave_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> { - self.io.get_ref().leave_multicast_v4(&multiaddr, &interface) + self.io.leave_multicast_v4(&multiaddr, &interface) } /// Executes an operation of the `IPV6_DROP_MEMBERSHIP` type. @@ -821,7 +788,7 @@ impl UdpSocket { /// /// [`join_multicast_v6`]: method@Self::join_multicast_v6 pub fn leave_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { - self.io.get_ref().leave_multicast_v6(multiaddr, interface) + self.io.leave_multicast_v6(multiaddr, interface) } /// Returns the value of the `SO_ERROR` option. @@ -844,7 +811,7 @@ impl UdpSocket { /// # } /// ``` pub fn take_error(&self) -> io::Result> { - self.io.get_ref().take_error() + self.io.take_error() } } @@ -862,7 +829,7 @@ impl TryFrom for UdpSocket { impl fmt::Debug for UdpSocket { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.io.get_ref().fmt(f) + self.io.fmt(f) } } @@ -873,7 +840,7 @@ mod sys { impl AsRawFd for UdpSocket { fn as_raw_fd(&self) -> RawFd { - self.io.get_ref().as_raw_fd() + self.io.as_raw_fd() } } } @@ -885,7 +852,7 @@ mod sys { impl AsRawSocket for UdpSocket { fn as_raw_socket(&self) -> RawSocket { - self.io.get_ref().as_raw_socket() + self.io.as_raw_socket() } } } diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs index 6215b579..f9e9321b 100644 --- a/tokio/src/net/unix/datagram/socket.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -1,4 +1,4 @@ -use crate::io::PollEvented; +use crate::io::{Interest, PollEvented}; use crate::net::unix::SocketAddr; use std::convert::TryFrom; @@ -270,7 +270,7 @@ impl UnixDatagram { /// # } /// ``` pub fn connect>(&self, path: P) -> io::Result<()> { - self.io.get_ref().connect(path) + self.io.connect(path) } /// Sends data on the socket to the socket's peer. @@ -301,7 +301,8 @@ impl UnixDatagram { /// ``` pub async fn send(&self, buf: &[u8]) -> io::Result { self.io - .async_io(mio::Interest::WRITABLE, |sock| sock.send(buf)) + .registration() + .async_io(Interest::WRITABLE, || self.io.send(buf)) .await } @@ -330,7 +331,7 @@ impl UnixDatagram { /// # } /// ``` pub fn try_send(&self, buf: &[u8]) -> io::Result { - self.io.get_ref().send(buf) + self.io.send(buf) } /// Try to send a datagram to the peer without waiting. @@ -369,7 +370,7 @@ impl UnixDatagram { where P: AsRef, { - self.io.get_ref().send_to(buf, target) + self.io.send_to(buf, target) } /// Receives data from the socket. @@ -400,7 +401,8 @@ impl UnixDatagram { /// ``` pub async fn recv(&self, buf: &mut [u8]) -> io::Result { self.io - .async_io(mio::Interest::READABLE, |sock| sock.recv(buf)) + .registration() + .async_io(Interest::READABLE, || self.io.recv(buf)) .await } @@ -429,7 +431,7 @@ impl UnixDatagram { /// # } /// ``` pub fn try_recv(&self, buf: &mut [u8]) -> io::Result { - self.io.get_ref().recv(buf) + self.io.recv(buf) } /// Sends data on the socket to the specified address. @@ -470,9 +472,8 @@ impl UnixDatagram { P: AsRef, { self.io - .async_io(mio::Interest::WRITABLE, |sock| { - sock.send_to(buf, target.as_ref()) - }) + .registration() + .async_io(Interest::WRITABLE, || self.io.send_to(buf, target.as_ref())) .await } @@ -512,7 +513,8 @@ impl UnixDatagram { pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { let (n, addr) = self .io - .async_io(mio::Interest::READABLE, |sock| sock.recv_from(buf)) + .registration() + .async_io(Interest::READABLE, || self.io.recv_from(buf)) .await?; Ok((n, SocketAddr(addr))) @@ -551,7 +553,7 @@ impl UnixDatagram { /// # } /// ``` pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - let (n, addr) = self.io.get_ref().recv_from(buf)?; + let (n, addr) = self.io.recv_from(buf)?; Ok((n, SocketAddr(addr))) } @@ -596,7 +598,7 @@ impl UnixDatagram { /// # } /// ``` pub fn local_addr(&self) -> io::Result { - self.io.get_ref().local_addr().map(SocketAddr) + self.io.local_addr().map(SocketAddr) } /// Returns the address of this socket's peer. @@ -645,7 +647,7 @@ impl UnixDatagram { /// # } /// ``` pub fn peer_addr(&self) -> io::Result { - self.io.get_ref().peer_addr().map(SocketAddr) + self.io.peer_addr().map(SocketAddr) } /// Returns the value of the `SO_ERROR`