diff options
Diffstat (limited to 'tokio/src/io/poll_evented.rs')
-rw-r--r-- | tokio/src/io/poll_evented.rs | 295 |
1 files changed, 93 insertions, 202 deletions
diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index 9054c3b8..2c943ea4 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -1,13 +1,12 @@ -use crate::io::driver::platform; -use crate::io::{AsyncRead, AsyncWrite, ReadBuf, Registration}; +use crate::io::driver::{Direction, Handle, ReadyEvent}; +use crate::io::registration::Registration; +use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; use mio::event::Evented; use std::fmt; use std::io::{self, Read, Write}; use std::marker::Unpin; use std::pin::Pin; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::Relaxed; use std::task::{Context, Poll}; cfg_io_driver! { @@ -53,37 +52,6 @@ cfg_io_driver! { /// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and /// [`clear_read_ready`]. /// - /// ```rust - /// use tokio::io::PollEvented; - /// - /// use futures::ready; - /// use mio::Ready; - /// use mio::net::{TcpStream, TcpListener}; - /// use std::io; - /// use std::task::{Context, Poll}; - /// - /// struct MyListener { - /// poll_evented: PollEvented<TcpListener>, - /// } - /// - /// impl MyListener { - /// pub fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll<Result<TcpStream, io::Error>> { - /// let ready = Ready::readable(); - /// - /// ready!(self.poll_evented.poll_read_ready(cx, ready))?; - /// - /// match self.poll_evented.get_ref().accept() { - /// Ok((socket, _)) => Poll::Ready(Ok(socket)), - /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - /// self.poll_evented.clear_read_ready(cx, ready)?; - /// Poll::Pending - /// } - /// Err(e) => Poll::Ready(Err(e)), - /// } - /// } - /// } - /// ``` - /// /// ## Platform-specific events /// /// `PollEvented` also allows receiving platform-specific `mio::Ready` events. @@ -101,66 +69,14 @@ cfg_io_driver! { /// [`clear_write_ready`]: method@Self::clear_write_ready /// [`poll_read_ready`]: method@Self::poll_read_ready /// [`poll_write_ready`]: method@Self::poll_write_ready - pub struct PollEvented<E: Evented> { + pub(crate) struct PollEvented<E: Evented> { io: Option<E>, - inner: Inner, + registration: Registration, } } -struct Inner { - registration: Registration, - - /// Currently visible read readiness - read_readiness: AtomicUsize, - - /// Currently visible write readiness - write_readiness: AtomicUsize, -} - // ===== impl PollEvented ===== -macro_rules! poll_ready { - ($me:expr, $mask:expr, $cache:ident, $take:ident, $poll:expr) => {{ - // Load cached & encoded readiness. - let mut cached = $me.inner.$cache.load(Relaxed); - let mask = $mask | platform::hup() | platform::error(); - - // See if the current readiness matches any bits. - let mut ret = mio::Ready::from_usize(cached) & $mask; - - if ret.is_empty() { - // Readiness does not match, consume the registration's readiness - // stream. This happens in a loop to ensure that the stream gets - // drained. - loop { - let ready = match $poll? { - Poll::Ready(v) => v, - Poll::Pending => return Poll::Pending, - }; - cached |= ready.as_usize(); - - // Update the cache store - $me.inner.$cache.store(cached, Relaxed); - - ret |= ready & mask; - - if !ret.is_empty() { - return Poll::Ready(Ok(ret)); - } - } - } else { - // Check what's new with the registration stream. This will not - // request to be notified - if let Some(ready) = $me.inner.registration.$take()? { - cached |= ready.as_usize(); - $me.inner.$cache.store(cached, Relaxed); - } - - Poll::Ready(Ok(mio::Ready::from_usize(cached))) - } - }}; -} - impl<E> PollEvented<E> where E: Evented, @@ -174,7 +90,8 @@ where /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. - pub fn new(io: E) -> io::Result<Self> { + #[cfg_attr(feature = "signal", allow(unused))] + pub(crate) fn new(io: E) -> io::Result<Self> { PollEvented::new_with_ready(io, mio::Ready::all()) } @@ -202,27 +119,39 @@ where /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. - pub fn new_with_ready(io: E, ready: mio::Ready) -> io::Result<Self> { - let registration = Registration::new_with_ready(&io, ready)?; + #[cfg_attr(feature = "signal", allow(unused))] + pub(crate) fn new_with_ready(io: E, ready: mio::Ready) -> io::Result<Self> { + Self::new_with_ready_and_handle(io, ready, Handle::current()) + } + + pub(crate) fn new_with_ready_and_handle( + io: E, + ready: mio::Ready, + handle: Handle, + ) -> io::Result<Self> { + let registration = Registration::new_with_ready_and_handle(&io, ready, handle)?; Ok(Self { io: Some(io), - inner: Inner { - registration, - read_readiness: AtomicUsize::new(0), - write_readiness: AtomicUsize::new(0), - }, + registration, }) } /// Returns a shared reference to the underlying I/O object this readiness /// stream is wrapping. - pub fn get_ref(&self) -> &E { + #[cfg(any( + feature = "process", + feature = "tcp", + feature = "udp", + feature = "uds", + feature = "signal" + ))] + pub(crate) fn get_ref(&self) -> &E { self.io.as_ref().unwrap() } /// Returns a mutable reference to the underlying I/O object this readiness /// stream is wrapping. - pub fn get_mut(&mut self) -> &mut E { + pub(crate) fn get_mut(&mut self) -> &mut E { self.io.as_mut().unwrap() } @@ -234,12 +163,17 @@ where /// Note that deregistering does not guarantee that the I/O resource can be /// registered with a different reactor. Some I/O resource types can only be /// associated with a single reactor instance for their lifetime. - pub fn into_inner(mut self) -> io::Result<E> { + #[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))] + pub(crate) fn into_inner(mut self) -> io::Result<E> { let io = self.io.take().unwrap(); - self.inner.registration.deregister(&io)?; + self.registration.deregister(&io)?; Ok(io) } + pub(crate) fn clear_readiness(&self, event: ReadyEvent) { + self.registration.clear_readiness(event); + } + /// Checks the I/O resource's read readiness state. /// /// The mask argument allows specifying what readiness to notify on. This @@ -266,51 +200,8 @@ where /// /// This method may not be called concurrently. It takes `&self` to allow /// calling it concurrently with `poll_write_ready`. - pub fn poll_read_ready( - &self, - cx: &mut Context<'_>, - mask: mio::Ready, - ) -> Poll<io::Result<mio::Ready>> { - assert!(!mask.is_writable(), "cannot poll for write readiness"); - poll_ready!( - self, - mask, - read_readiness, - take_read_ready, - self.inner.registration.poll_read_ready(cx) - ) - } - - /// Clears the I/O resource's read readiness state and registers the current - /// task to be notified once a read readiness event is received. - /// - /// After calling this function, `poll_read_ready` will return - /// `Poll::Pending` until a new read readiness event has been received. - /// - /// The `mask` argument specifies the readiness bits to clear. This may not - /// include `writable` or `hup`. - /// - /// # Panics - /// - /// This function panics if: - /// - /// * `ready` includes writable or HUP - /// * called from outside of a task context. - pub fn clear_read_ready(&self, cx: &mut Context<'_>, ready: mio::Ready) -> io::Result<()> { - // Cannot clear write readiness - assert!(!ready.is_writable(), "cannot clear write readiness"); - assert!(!platform::is_hup(ready), "cannot clear HUP readiness"); - - self.inner - .read_readiness - .fetch_and(!ready.as_usize(), Relaxed); - - if self.poll_read_ready(cx, ready)?.is_ready() { - // Notify the current task - cx.waker().wake_by_ref(); - } - - Ok(()) + pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> { + self.registration.poll_readiness(cx, Direction::Read) } /// Checks the I/O resource's write readiness state. @@ -337,41 +228,35 @@ where /// /// This method may not be called concurrently. It takes `&self` to allow /// calling it concurrently with `poll_read_ready`. - pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> { - poll_ready!( - self, - mio::Ready::writable(), - write_readiness, - take_write_ready, - self.inner.registration.poll_write_ready(cx) - ) + pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> { + self.registration.poll_readiness(cx, Direction::Write) } +} - /// Resets the I/O resource's write readiness state and registers the current - /// task to be notified once a write readiness event is received. - /// - /// This only clears writable readiness. HUP (on platforms that support HUP) - /// cannot be cleared as it is a final state. - /// - /// After calling this function, `poll_write_ready(Ready::writable())` will - /// return `NotReady` until a new write readiness event has been received. - /// - /// # Panics - /// - /// This function will panic if called from outside of a task context. - pub fn clear_write_ready(&self, cx: &mut Context<'_>) -> io::Result<()> { - let ready = mio::Ready::writable(); +cfg_io_readiness! { + impl<E> PollEvented<E> + where + E: Evented, + { + pub(crate) async fn readiness(&self, interest: mio::Ready) -> io::Result<ReadyEvent> { + self.registration.readiness(interest).await + } - self.inner - .write_readiness - .fetch_and(!ready.as_usize(), Relaxed); + pub(crate) async fn async_io<F, R>(&self, interest: mio::Ready, mut op: F) -> io::Result<R> + where + F: FnMut(&E) -> io::Result<R>, + { + loop { + let event = self.readiness(interest).await?; - if self.poll_write_ready(cx)?.is_ready() { - // Notify the current task - cx.waker().wake_by_ref(); + match op(self.get_ref()) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.clear_readiness(event); + } + x => return x, + } + } } - - Ok(()) } } @@ -386,20 +271,22 @@ where cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>> { - ready!(self.poll_read_ready(cx, mio::Ready::readable()))?; + loop { + let ev = ready!(self.poll_read_ready(cx))?; - // We can't assume the `Read` won't look at the read buffer, - // so we have to force initialization here. - let r = (*self).get_mut().read(buf.initialize_unfilled()); + // We can't assume the `Read` won't look at the read buffer, + // so we have to force initialization here. + let r = (*self).get_mut().read(buf.initialize_unfilled()); - if is_wouldblock(&r) { - self.clear_read_ready(cx, mio::Ready::readable())?; - return Poll::Pending; - } + if is_wouldblock(&r) { + self.clear_readiness(ev); + continue; + } - Poll::Ready(r.map(|n| { - buf.add_filled(n); - })) + return Poll::Ready(r.map(|n| { + buf.add_filled(n); + })); + } } } @@ -412,29 +299,33 @@ where cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>> { - ready!(self.poll_write_ready(cx))?; + loop { + let ev = ready!(self.poll_write_ready(cx))?; - let r = (*self).get_mut().write(buf); + let r = (*self).get_mut().write(buf); - if is_wouldblock(&r) { - self.clear_write_ready(cx)?; - return Poll::Pending; - } + if is_wouldblock(&r) { + self.clear_readiness(ev); + continue; + } - Poll::Ready(r) + return Poll::Ready(r); + } } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { - ready!(self.poll_write_ready(cx))?; + loop { + let ev = ready!(self.poll_write_ready(cx))?; - let r = (*self).get_mut().flush(); + let r = (*self).get_mut().flush(); - if is_wouldblock(&r) { - self.clear_write_ready(cx)?; - return Poll::Pending; - } + if is_wouldblock(&r) { + self.clear_readiness(ev); + continue; + } - Poll::Ready(r) + return Poll::Ready(r); + } } fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { @@ -459,7 +350,7 @@ impl<E: Evented> Drop for PollEvented<E> { fn drop(&mut self) { if let Some(io) = self.io.take() { // Ignore errors - let _ = self.inner.registration.deregister(&io); + let _ = self.registration.deregister(&io); } } } |