summaryrefslogtreecommitdiffstats
path: root/tokio/src/io/poll_evented.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-11-11 09:28:21 -0800
committerGitHub <noreply@github.com>2020-11-11 09:28:21 -0800
commitce891a4df17e632f7557dd0cd1f1e8da89bd6ae4 (patch)
treefa5478c0b3bacacfc65bfbadbe1cdb92234d5b5f /tokio/src/io/poll_evented.rs
parentd869e16990c5fc2cbda48b036708efa4b450e807 (diff)
io: driver internal cleanup (#3124)
* Removes duplicated code by moving it to `Registration`. * impl `Deref` for `PollEvented` to avoid `get_ref()`. * Avoid extra waker clones in I/O driver. * Add `Interest` wrapper around `mio::Interest`.
Diffstat (limited to 'tokio/src/io/poll_evented.rs')
-rw-r--r--tokio/src/io/poll_evented.rs228
1 files changed, 53 insertions, 175 deletions
diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs
index 66a26346..803932ba 100644
--- a/tokio/src/io/poll_evented.rs
+++ b/tokio/src/io/poll_evented.rs
@@ -1,13 +1,9 @@
-use crate::io::driver::{Direction, Handle, ReadyEvent};
-use crate::io::registration::Registration;
-use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
+use crate::io::driver::{Handle, Interest, Registration};
use mio::event::Source;
use std::fmt;
-use std::io::{self, Read, Write};
-use std::marker::Unpin;
-use std::pin::Pin;
-use std::task::{Context, Poll};
+use std::io;
+use std::ops::Deref;
cfg_io_driver! {
/// Associates an I/O resource that implements the [`std::io::Read`] and/or
@@ -89,30 +85,32 @@ impl<E: Source> PollEvented<E> {
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
#[cfg_attr(feature = "signal", allow(unused))]
pub(crate) fn new(io: E) -> io::Result<Self> {
- PollEvented::new_with_interest(io, mio::Interest::READABLE | mio::Interest::WRITABLE)
+ PollEvented::new_with_interest(io, Interest::READABLE | Interest::WRITABLE)
}
- /// Creates a new `PollEvented` associated with the default reactor, for specific `mio::Interest`
- /// state. `new_with_interest` should be used over `new` when you need control over the readiness
- /// state, such as when a file descriptor only allows reads. This does not add `hup` or `error`
- /// so if you are interested in those states, you will need to add them to the readiness state
- /// passed to this function.
+ /// Creates a new `PollEvented` associated with the default reactor, for
+ /// specific `Interest` state. `new_with_interest` should be used over `new`
+ /// when you need control over the readiness state, such as when a file
+ /// descriptor only allows reads. This does not add `hup` or `error` so if
+ /// you are interested in those states, you will need to add them to the
+ /// readiness state passed to this function.
///
/// # Panics
///
/// This function panics if thread-local runtime is not set.
///
- /// The runtime is usually set implicitly when this function is called
- /// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ /// The runtime is usually set implicitly when this function is called from
+ /// a future driven by a tokio runtime, otherwise runtime can be set
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter)
+ /// function.
#[cfg_attr(feature = "signal", allow(unused))]
- pub(crate) fn new_with_interest(io: E, interest: mio::Interest) -> io::Result<Self> {
+ pub(crate) fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
Self::new_with_interest_and_handle(io, interest, Handle::current())
}
pub(crate) fn new_with_interest_and_handle(
mut io: E,
- interest: mio::Interest,
+ interest: Interest,
handle: Handle,
) -> io::Result<Self> {
let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
@@ -122,177 +120,57 @@ impl<E: Source> PollEvented<E> {
})
}
- /// Returns a shared reference to the underlying I/O object this readiness
- /// stream is wrapping.
- #[cfg(any(feature = "net", feature = "process", feature = "signal"))]
- pub(crate) fn get_ref(&self) -> &E {
- self.io.as_ref().unwrap()
- }
-
- /// Returns a mutable reference to the underlying I/O object this readiness
- /// stream is wrapping.
- pub(crate) fn get_mut(&mut self) -> &mut E {
- self.io.as_mut().unwrap()
- }
-
- pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
- self.registration.clear_readiness(event);
+ /// Returns a reference to the registration
+ pub(crate) fn registration(&self) -> &Registration {
+ &self.registration
}
+}
- /// Checks the I/O resource's read readiness state.
- ///
- /// The mask argument allows specifying what readiness to notify on. This
- /// can be any value, including platform specific readiness, **except**
- /// `writable`. HUP is always implicitly included on platforms that support
- /// it.
- ///
- /// If the resource is not ready for a read then `Poll::Pending` is returned
- /// and the current task is notified once a new event is received.
- ///
- /// The I/O resource will remain in a read-ready state until readiness is
- /// cleared by calling [`clear_read_ready`].
- ///
- /// [`clear_read_ready`]: method@Self::clear_read_ready
- ///
- /// # Panics
- ///
- /// This function panics if:
- ///
- /// * `ready` includes writable.
- /// * called from outside of a task context.
- ///
- /// # Warning
- ///
- /// This method may not be called concurrently. It takes `&self` to allow
- /// calling it concurrently with `poll_write_ready`.
- pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
- self.registration.poll_readiness(cx, Direction::Read)
- }
+feature! {
+ #![any(feature = "net", feature = "process")]
- /// Checks the I/O resource's write readiness state.
- ///
- /// This always checks for writable readiness and also checks for HUP
- /// readiness on platforms that support it.
- ///
- /// If the resource is not ready for a write then `Poll::Pending` is
- /// returned and the current task is notified once a new event is received.
- ///
- /// The I/O resource will remain in a write-ready state until readiness is
- /// cleared by calling [`clear_write_ready`].
- ///
- /// [`clear_write_ready`]: method@Self::clear_write_ready
- ///
- /// # Panics
- ///
- /// This function panics if:
- ///
- /// * `ready` contains bits besides `writable` and `hup`.
- /// * called from outside of a task context.
- ///
- /// # Warning
- ///
- /// This method may not be called concurrently. It takes `&self` to allow
- /// calling it concurrently with `poll_read_ready`.
- pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
- self.registration.poll_readiness(cx, Direction::Write)
- }
-}
+ use crate::io::ReadBuf;
+ use std::task::{Context, Poll};
-cfg_io_readiness! {
impl<E: Source> PollEvented<E> {
- pub(crate) async fn readiness(&self, interest: mio::Interest) -> io::Result<ReadyEvent> {
- self.registration.readiness(interest).await
- }
-
- pub(crate) async fn async_io<F, R>(&self, interest: mio::Interest, mut op: F) -> io::Result<R>
+ // Safety: The caller must ensure that `E` can read into uninitialized memory
+ pub(crate) unsafe fn poll_read<'a>(
+ &'a self,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>>
where
- F: FnMut(&E) -> io::Result<R>,
+ &'a E: io::Read + 'a,
{
- loop {
- let event = self.readiness(interest).await?;
-
- match op(self.get_ref()) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.clear_readiness(event);
- }
- x => return x,
- }
- }
+ use std::io::Read;
+
+ let n = ready!(self.registration.poll_read_io(cx, || {
+ let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
+ self.io.as_ref().unwrap().read(b)
+ }))?;
+
+ // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
+ // buffer.
+ buf.assume_init(n);
+ buf.advance(n);
+ Poll::Ready(Ok(()))
}
- }
-}
-
-// ===== Read / Write impls =====
-
-impl<E: Source + Read + Unpin> AsyncRead for PollEvented<E> {
- fn poll_read(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &mut ReadBuf<'_>,
- ) -> Poll<io::Result<()>> {
- loop {
- let ev = ready!(self.poll_read_ready(cx))?;
-
- // We can't assume the `Read` won't look at the read buffer,
- // so we have to force initialization here.
- let r = (*self).get_mut().read(buf.initialize_unfilled());
- if is_wouldblock(&r) {
- self.clear_readiness(ev);
- continue;
- }
-
- return Poll::Ready(r.map(|n| {
- buf.advance(n);
- }));
+ pub(crate) fn poll_write<'a>(&'a self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>>
+ where
+ &'a E: io::Write + 'a,
+ {
+ use std::io::Write;
+ self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write(buf))
}
}
}
-impl<E: Source + Write + Unpin> AsyncWrite for PollEvented<E> {
- fn poll_write(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<io::Result<usize>> {
- loop {
- let ev = ready!(self.poll_write_ready(cx))?;
+impl<E: Source> Deref for PollEvented<E> {
+ type Target = E;
- let r = (*self).get_mut().write(buf);
-
- if is_wouldblock(&r) {
- self.clear_readiness(ev);
- continue;
- }
-
- return Poll::Ready(r);
- }
- }
-
- fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- loop {
- let ev = ready!(self.poll_write_ready(cx))?;
-
- let r = (*self).get_mut().flush();
-
- if is_wouldblock(&r) {
- self.clear_readiness(ev);
- continue;
- }
-
- return Poll::Ready(r);
- }
- }
-
- fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- Poll::Ready(Ok(()))
- }
-}
-
-fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
- match *r {
- Ok(_) => false,
- Err(ref e) => e.kind() == io::ErrorKind::WouldBlock,
+ fn deref(&self) -> &E {
+ self.io.as_ref().unwrap()
}
}