diff options
author | Carl Lerche <me@carllerche.com> | 2020-11-11 09:28:21 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-11 09:28:21 -0800 |
commit | ce891a4df17e632f7557dd0cd1f1e8da89bd6ae4 (patch) | |
tree | fa5478c0b3bacacfc65bfbadbe1cdb92234d5b5f /tokio/src/io/poll_evented.rs | |
parent | d869e16990c5fc2cbda48b036708efa4b450e807 (diff) |
io: driver internal cleanup (#3124)
* Removes duplicated code by moving it to `Registration`.
* impl `Deref` for `PollEvented` to avoid `get_ref()`.
* Avoid extra waker clones in I/O driver.
* Add `Interest` wrapper around `mio::Interest`.
Diffstat (limited to 'tokio/src/io/poll_evented.rs')
-rw-r--r-- | tokio/src/io/poll_evented.rs | 228 |
1 files changed, 53 insertions, 175 deletions
diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index 66a26346..803932ba 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -1,13 +1,9 @@ -use crate::io::driver::{Direction, Handle, ReadyEvent}; -use crate::io::registration::Registration; -use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; +use crate::io::driver::{Handle, Interest, Registration}; use mio::event::Source; use std::fmt; -use std::io::{self, Read, Write}; -use std::marker::Unpin; -use std::pin::Pin; -use std::task::{Context, Poll}; +use std::io; +use std::ops::Deref; cfg_io_driver! { /// Associates an I/O resource that implements the [`std::io::Read`] and/or @@ -89,30 +85,32 @@ impl<E: Source> PollEvented<E> { /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. #[cfg_attr(feature = "signal", allow(unused))] pub(crate) fn new(io: E) -> io::Result<Self> { - PollEvented::new_with_interest(io, mio::Interest::READABLE | mio::Interest::WRITABLE) + PollEvented::new_with_interest(io, Interest::READABLE | Interest::WRITABLE) } - /// Creates a new `PollEvented` associated with the default reactor, for specific `mio::Interest` - /// state. `new_with_interest` should be used over `new` when you need control over the readiness - /// state, such as when a file descriptor only allows reads. This does not add `hup` or `error` - /// so if you are interested in those states, you will need to add them to the readiness state - /// passed to this function. + /// Creates a new `PollEvented` associated with the default reactor, for + /// specific `Interest` state. `new_with_interest` should be used over `new` + /// when you need control over the readiness state, such as when a file + /// descriptor only allows reads. This does not add `hup` or `error` so if + /// you are interested in those states, you will need to add them to the + /// readiness state passed to this function. /// /// # Panics /// /// This function panics if thread-local runtime is not set. /// - /// The runtime is usually set implicitly when this function is called - /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + /// The runtime is usually set implicitly when this function is called from + /// a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) + /// function. #[cfg_attr(feature = "signal", allow(unused))] - pub(crate) fn new_with_interest(io: E, interest: mio::Interest) -> io::Result<Self> { + pub(crate) fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> { Self::new_with_interest_and_handle(io, interest, Handle::current()) } pub(crate) fn new_with_interest_and_handle( mut io: E, - interest: mio::Interest, + interest: Interest, handle: Handle, ) -> io::Result<Self> { let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?; @@ -122,177 +120,57 @@ impl<E: Source> PollEvented<E> { }) } - /// Returns a shared reference to the underlying I/O object this readiness - /// stream is wrapping. - #[cfg(any(feature = "net", feature = "process", feature = "signal"))] - pub(crate) fn get_ref(&self) -> &E { - self.io.as_ref().unwrap() - } - - /// Returns a mutable reference to the underlying I/O object this readiness - /// stream is wrapping. - pub(crate) fn get_mut(&mut self) -> &mut E { - self.io.as_mut().unwrap() - } - - pub(crate) fn clear_readiness(&self, event: ReadyEvent) { - self.registration.clear_readiness(event); + /// Returns a reference to the registration + pub(crate) fn registration(&self) -> &Registration { + &self.registration } +} - /// Checks the I/O resource's read readiness state. - /// - /// The mask argument allows specifying what readiness to notify on. This - /// can be any value, including platform specific readiness, **except** - /// `writable`. HUP is always implicitly included on platforms that support - /// it. - /// - /// If the resource is not ready for a read then `Poll::Pending` is returned - /// and the current task is notified once a new event is received. - /// - /// The I/O resource will remain in a read-ready state until readiness is - /// cleared by calling [`clear_read_ready`]. - /// - /// [`clear_read_ready`]: method@Self::clear_read_ready - /// - /// # Panics - /// - /// This function panics if: - /// - /// * `ready` includes writable. - /// * called from outside of a task context. - /// - /// # Warning - /// - /// This method may not be called concurrently. It takes `&self` to allow - /// calling it concurrently with `poll_write_ready`. - pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> { - self.registration.poll_readiness(cx, Direction::Read) - } +feature! { + #![any(feature = "net", feature = "process")] - /// Checks the I/O resource's write readiness state. - /// - /// This always checks for writable readiness and also checks for HUP - /// readiness on platforms that support it. - /// - /// If the resource is not ready for a write then `Poll::Pending` is - /// returned and the current task is notified once a new event is received. - /// - /// The I/O resource will remain in a write-ready state until readiness is - /// cleared by calling [`clear_write_ready`]. - /// - /// [`clear_write_ready`]: method@Self::clear_write_ready - /// - /// # Panics - /// - /// This function panics if: - /// - /// * `ready` contains bits besides `writable` and `hup`. - /// * called from outside of a task context. - /// - /// # Warning - /// - /// This method may not be called concurrently. It takes `&self` to allow - /// calling it concurrently with `poll_read_ready`. - pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> { - self.registration.poll_readiness(cx, Direction::Write) - } -} + use crate::io::ReadBuf; + use std::task::{Context, Poll}; -cfg_io_readiness! { impl<E: Source> PollEvented<E> { - pub(crate) async fn readiness(&self, interest: mio::Interest) -> io::Result<ReadyEvent> { - self.registration.readiness(interest).await - } - - pub(crate) async fn async_io<F, R>(&self, interest: mio::Interest, mut op: F) -> io::Result<R> + // Safety: The caller must ensure that `E` can read into uninitialized memory + pub(crate) unsafe fn poll_read<'a>( + &'a self, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> where - F: FnMut(&E) -> io::Result<R>, + &'a E: io::Read + 'a, { - loop { - let event = self.readiness(interest).await?; - - match op(self.get_ref()) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.clear_readiness(event); - } - x => return x, - } - } + use std::io::Read; + + let n = ready!(self.registration.poll_read_io(cx, || { + let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]); + self.io.as_ref().unwrap().read(b) + }))?; + + // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the + // buffer. + buf.assume_init(n); + buf.advance(n); + Poll::Ready(Ok(())) } - } -} - -// ===== Read / Write impls ===== - -impl<E: Source + Read + Unpin> AsyncRead for PollEvented<E> { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll<io::Result<()>> { - loop { - let ev = ready!(self.poll_read_ready(cx))?; - - // We can't assume the `Read` won't look at the read buffer, - // so we have to force initialization here. - let r = (*self).get_mut().read(buf.initialize_unfilled()); - if is_wouldblock(&r) { - self.clear_readiness(ev); - continue; - } - - return Poll::Ready(r.map(|n| { - buf.advance(n); - })); + pub(crate) fn poll_write<'a>(&'a self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> + where + &'a E: io::Write + 'a, + { + use std::io::Write; + self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write(buf)) } } } -impl<E: Source + Write + Unpin> AsyncWrite for PollEvented<E> { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<io::Result<usize>> { - loop { - let ev = ready!(self.poll_write_ready(cx))?; +impl<E: Source> Deref for PollEvented<E> { + type Target = E; - let r = (*self).get_mut().write(buf); - - if is_wouldblock(&r) { - self.clear_readiness(ev); - continue; - } - - return Poll::Ready(r); - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { - loop { - let ev = ready!(self.poll_write_ready(cx))?; - - let r = (*self).get_mut().flush(); - - if is_wouldblock(&r) { - self.clear_readiness(ev); - continue; - } - - return Poll::Ready(r); - } - } - - fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { - Poll::Ready(Ok(())) - } -} - -fn is_wouldblock<T>(r: &io::Result<T>) -> bool { - match *r { - Ok(_) => false, - Err(ref e) => e.kind() == io::ErrorKind::WouldBlock, + fn deref(&self) -> &E { + self.io.as_ref().unwrap() } } |