summaryrefslogtreecommitdiffstats
path: root/tokio/src/io/poll_evented.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/io/poll_evented.rs')
-rw-r--r--tokio/src/io/poll_evented.rs295
1 files changed, 93 insertions, 202 deletions
diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs
index 9054c3b8..2c943ea4 100644
--- a/tokio/src/io/poll_evented.rs
+++ b/tokio/src/io/poll_evented.rs
@@ -1,13 +1,12 @@
-use crate::io::driver::platform;
-use crate::io::{AsyncRead, AsyncWrite, ReadBuf, Registration};
+use crate::io::driver::{Direction, Handle, ReadyEvent};
+use crate::io::registration::Registration;
+use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
use mio::event::Evented;
use std::fmt;
use std::io::{self, Read, Write};
use std::marker::Unpin;
use std::pin::Pin;
-use std::sync::atomic::AtomicUsize;
-use std::sync::atomic::Ordering::Relaxed;
use std::task::{Context, Poll};
cfg_io_driver! {
@@ -53,37 +52,6 @@ cfg_io_driver! {
/// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and
/// [`clear_read_ready`].
///
- /// ```rust
- /// use tokio::io::PollEvented;
- ///
- /// use futures::ready;
- /// use mio::Ready;
- /// use mio::net::{TcpStream, TcpListener};
- /// use std::io;
- /// use std::task::{Context, Poll};
- ///
- /// struct MyListener {
- /// poll_evented: PollEvented<TcpListener>,
- /// }
- ///
- /// impl MyListener {
- /// pub fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll<Result<TcpStream, io::Error>> {
- /// let ready = Ready::readable();
- ///
- /// ready!(self.poll_evented.poll_read_ready(cx, ready))?;
- ///
- /// match self.poll_evented.get_ref().accept() {
- /// Ok((socket, _)) => Poll::Ready(Ok(socket)),
- /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- /// self.poll_evented.clear_read_ready(cx, ready)?;
- /// Poll::Pending
- /// }
- /// Err(e) => Poll::Ready(Err(e)),
- /// }
- /// }
- /// }
- /// ```
- ///
/// ## Platform-specific events
///
/// `PollEvented` also allows receiving platform-specific `mio::Ready` events.
@@ -101,66 +69,14 @@ cfg_io_driver! {
/// [`clear_write_ready`]: method@Self::clear_write_ready
/// [`poll_read_ready`]: method@Self::poll_read_ready
/// [`poll_write_ready`]: method@Self::poll_write_ready
- pub struct PollEvented<E: Evented> {
+ pub(crate) struct PollEvented<E: Evented> {
io: Option<E>,
- inner: Inner,
+ registration: Registration,
}
}
-struct Inner {
- registration: Registration,
-
- /// Currently visible read readiness
- read_readiness: AtomicUsize,
-
- /// Currently visible write readiness
- write_readiness: AtomicUsize,
-}
-
// ===== impl PollEvented =====
-macro_rules! poll_ready {
- ($me:expr, $mask:expr, $cache:ident, $take:ident, $poll:expr) => {{
- // Load cached & encoded readiness.
- let mut cached = $me.inner.$cache.load(Relaxed);
- let mask = $mask | platform::hup() | platform::error();
-
- // See if the current readiness matches any bits.
- let mut ret = mio::Ready::from_usize(cached) & $mask;
-
- if ret.is_empty() {
- // Readiness does not match, consume the registration's readiness
- // stream. This happens in a loop to ensure that the stream gets
- // drained.
- loop {
- let ready = match $poll? {
- Poll::Ready(v) => v,
- Poll::Pending => return Poll::Pending,
- };
- cached |= ready.as_usize();
-
- // Update the cache store
- $me.inner.$cache.store(cached, Relaxed);
-
- ret |= ready & mask;
-
- if !ret.is_empty() {
- return Poll::Ready(Ok(ret));
- }
- }
- } else {
- // Check what's new with the registration stream. This will not
- // request to be notified
- if let Some(ready) = $me.inner.registration.$take()? {
- cached |= ready.as_usize();
- $me.inner.$cache.store(cached, Relaxed);
- }
-
- Poll::Ready(Ok(mio::Ready::from_usize(cached)))
- }
- }};
-}
-
impl<E> PollEvented<E>
where
E: Evented,
@@ -174,7 +90,8 @@ where
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
- pub fn new(io: E) -> io::Result<Self> {
+ #[cfg_attr(feature = "signal", allow(unused))]
+ pub(crate) fn new(io: E) -> io::Result<Self> {
PollEvented::new_with_ready(io, mio::Ready::all())
}
@@ -202,27 +119,39 @@ where
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
- pub fn new_with_ready(io: E, ready: mio::Ready) -> io::Result<Self> {
- let registration = Registration::new_with_ready(&io, ready)?;
+ #[cfg_attr(feature = "signal", allow(unused))]
+ pub(crate) fn new_with_ready(io: E, ready: mio::Ready) -> io::Result<Self> {
+ Self::new_with_ready_and_handle(io, ready, Handle::current())
+ }
+
+ pub(crate) fn new_with_ready_and_handle(
+ io: E,
+ ready: mio::Ready,
+ handle: Handle,
+ ) -> io::Result<Self> {
+ let registration = Registration::new_with_ready_and_handle(&io, ready, handle)?;
Ok(Self {
io: Some(io),
- inner: Inner {
- registration,
- read_readiness: AtomicUsize::new(0),
- write_readiness: AtomicUsize::new(0),
- },
+ registration,
})
}
/// Returns a shared reference to the underlying I/O object this readiness
/// stream is wrapping.
- pub fn get_ref(&self) -> &E {
+ #[cfg(any(
+ feature = "process",
+ feature = "tcp",
+ feature = "udp",
+ feature = "uds",
+ feature = "signal"
+ ))]
+ pub(crate) fn get_ref(&self) -> &E {
self.io.as_ref().unwrap()
}
/// Returns a mutable reference to the underlying I/O object this readiness
/// stream is wrapping.
- pub fn get_mut(&mut self) -> &mut E {
+ pub(crate) fn get_mut(&mut self) -> &mut E {
self.io.as_mut().unwrap()
}
@@ -234,12 +163,17 @@ where
/// Note that deregistering does not guarantee that the I/O resource can be
/// registered with a different reactor. Some I/O resource types can only be
/// associated with a single reactor instance for their lifetime.
- pub fn into_inner(mut self) -> io::Result<E> {
+ #[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))]
+ pub(crate) fn into_inner(mut self) -> io::Result<E> {
let io = self.io.take().unwrap();
- self.inner.registration.deregister(&io)?;
+ self.registration.deregister(&io)?;
Ok(io)
}
+ pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
+ self.registration.clear_readiness(event);
+ }
+
/// Checks the I/O resource's read readiness state.
///
/// The mask argument allows specifying what readiness to notify on. This
@@ -266,51 +200,8 @@ where
///
/// This method may not be called concurrently. It takes `&self` to allow
/// calling it concurrently with `poll_write_ready`.
- pub fn poll_read_ready(
- &self,
- cx: &mut Context<'_>,
- mask: mio::Ready,
- ) -> Poll<io::Result<mio::Ready>> {
- assert!(!mask.is_writable(), "cannot poll for write readiness");
- poll_ready!(
- self,
- mask,
- read_readiness,
- take_read_ready,
- self.inner.registration.poll_read_ready(cx)
- )
- }
-
- /// Clears the I/O resource's read readiness state and registers the current
- /// task to be notified once a read readiness event is received.
- ///
- /// After calling this function, `poll_read_ready` will return
- /// `Poll::Pending` until a new read readiness event has been received.
- ///
- /// The `mask` argument specifies the readiness bits to clear. This may not
- /// include `writable` or `hup`.
- ///
- /// # Panics
- ///
- /// This function panics if:
- ///
- /// * `ready` includes writable or HUP
- /// * called from outside of a task context.
- pub fn clear_read_ready(&self, cx: &mut Context<'_>, ready: mio::Ready) -> io::Result<()> {
- // Cannot clear write readiness
- assert!(!ready.is_writable(), "cannot clear write readiness");
- assert!(!platform::is_hup(ready), "cannot clear HUP readiness");
-
- self.inner
- .read_readiness
- .fetch_and(!ready.as_usize(), Relaxed);
-
- if self.poll_read_ready(cx, ready)?.is_ready() {
- // Notify the current task
- cx.waker().wake_by_ref();
- }
-
- Ok(())
+ pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
+ self.registration.poll_readiness(cx, Direction::Read)
}
/// Checks the I/O resource's write readiness state.
@@ -337,41 +228,35 @@ where
///
/// This method may not be called concurrently. It takes `&self` to allow
/// calling it concurrently with `poll_read_ready`.
- pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> {
- poll_ready!(
- self,
- mio::Ready::writable(),
- write_readiness,
- take_write_ready,
- self.inner.registration.poll_write_ready(cx)
- )
+ pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
+ self.registration.poll_readiness(cx, Direction::Write)
}
+}
- /// Resets the I/O resource's write readiness state and registers the current
- /// task to be notified once a write readiness event is received.
- ///
- /// This only clears writable readiness. HUP (on platforms that support HUP)
- /// cannot be cleared as it is a final state.
- ///
- /// After calling this function, `poll_write_ready(Ready::writable())` will
- /// return `NotReady` until a new write readiness event has been received.
- ///
- /// # Panics
- ///
- /// This function will panic if called from outside of a task context.
- pub fn clear_write_ready(&self, cx: &mut Context<'_>) -> io::Result<()> {
- let ready = mio::Ready::writable();
+cfg_io_readiness! {
+ impl<E> PollEvented<E>
+ where
+ E: Evented,
+ {
+ pub(crate) async fn readiness(&self, interest: mio::Ready) -> io::Result<ReadyEvent> {
+ self.registration.readiness(interest).await
+ }
- self.inner
- .write_readiness
- .fetch_and(!ready.as_usize(), Relaxed);
+ pub(crate) async fn async_io<F, R>(&self, interest: mio::Ready, mut op: F) -> io::Result<R>
+ where
+ F: FnMut(&E) -> io::Result<R>,
+ {
+ loop {
+ let event = self.readiness(interest).await?;
- if self.poll_write_ready(cx)?.is_ready() {
- // Notify the current task
- cx.waker().wake_by_ref();
+ match op(self.get_ref()) {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.clear_readiness(event);
+ }
+ x => return x,
+ }
+ }
}
-
- Ok(())
}
}
@@ -386,20 +271,22 @@ where
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
- ready!(self.poll_read_ready(cx, mio::Ready::readable()))?;
+ loop {
+ let ev = ready!(self.poll_read_ready(cx))?;
- // We can't assume the `Read` won't look at the read buffer,
- // so we have to force initialization here.
- let r = (*self).get_mut().read(buf.initialize_unfilled());
+ // We can't assume the `Read` won't look at the read buffer,
+ // so we have to force initialization here.
+ let r = (*self).get_mut().read(buf.initialize_unfilled());
- if is_wouldblock(&r) {
- self.clear_read_ready(cx, mio::Ready::readable())?;
- return Poll::Pending;
- }
+ if is_wouldblock(&r) {
+ self.clear_readiness(ev);
+ continue;
+ }
- Poll::Ready(r.map(|n| {
- buf.add_filled(n);
- }))
+ return Poll::Ready(r.map(|n| {
+ buf.add_filled(n);
+ }));
+ }
}
}
@@ -412,29 +299,33 @@ where
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
- ready!(self.poll_write_ready(cx))?;
+ loop {
+ let ev = ready!(self.poll_write_ready(cx))?;
- let r = (*self).get_mut().write(buf);
+ let r = (*self).get_mut().write(buf);
- if is_wouldblock(&r) {
- self.clear_write_ready(cx)?;
- return Poll::Pending;
- }
+ if is_wouldblock(&r) {
+ self.clear_readiness(ev);
+ continue;
+ }
- Poll::Ready(r)
+ return Poll::Ready(r);
+ }
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- ready!(self.poll_write_ready(cx))?;
+ loop {
+ let ev = ready!(self.poll_write_ready(cx))?;
- let r = (*self).get_mut().flush();
+ let r = (*self).get_mut().flush();
- if is_wouldblock(&r) {
- self.clear_write_ready(cx)?;
- return Poll::Pending;
- }
+ if is_wouldblock(&r) {
+ self.clear_readiness(ev);
+ continue;
+ }
- Poll::Ready(r)
+ return Poll::Ready(r);
+ }
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
@@ -459,7 +350,7 @@ impl<E: Evented> Drop for PollEvented<E> {
fn drop(&mut self) {
if let Some(io) = self.io.take() {
// Ignore errors
- let _ = self.inner.registration.deregister(&io);
+ let _ = self.registration.deregister(&io);
}
}
}