diff options
Diffstat (limited to 'tokio-reactor/src/poll_evented.rs')
-rw-r--r-- | tokio-reactor/src/poll_evented.rs | 174 |
1 files changed, 54 insertions, 120 deletions
diff --git a/tokio-reactor/src/poll_evented.rs b/tokio-reactor/src/poll_evented.rs index 140ac4ca..5b280294 100644 --- a/tokio-reactor/src/poll_evented.rs +++ b/tokio-reactor/src/poll_evented.rs @@ -1,11 +1,13 @@ use crate::{Handle, Registration}; -use futures::{task, try_ready, Async, Poll}; use mio; use mio::event::Evented; use std::fmt; use std::io::{self, Read, Write}; +use std::marker::Unpin; +use std::pin::Pin; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; +use std::task::{Context, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; /// Associates an I/O resource that implements the [`std::io::Read`] and/or @@ -116,7 +118,10 @@ macro_rules! poll_ready { // stream. This happens in a loop to ensure that the stream gets // drained. loop { - let ready = try_ready!($poll); + let ready = match $poll? { + Poll::Ready(v) => v, + Poll::Pending => return Poll::Pending, + }; cached |= ready.as_usize(); // Update the cache store @@ -125,7 +130,7 @@ macro_rules! poll_ready { ret |= ready & mask; if !ret.is_empty() { - return Ok(ret.into()); + return Poll::Ready(Ok(ret)); } } } else { @@ -136,7 +141,7 @@ macro_rules! poll_ready { $me.inner.$cache.store(cached, Relaxed); } - Ok(mio::Ready::from_usize(cached).into()) + Poll::Ready(Ok(mio::Ready::from_usize(cached))) } }}; } @@ -217,14 +222,18 @@ where /// /// * `ready` includes writable. /// * called from outside of a task context. - pub fn poll_read_ready(&self, mask: mio::Ready) -> Poll<mio::Ready, io::Error> { + pub fn poll_read_ready( + &self, + cx: &mut Context<'_>, + mask: mio::Ready, + ) -> Poll<io::Result<mio::Ready>> { assert!(!mask.is_writable(), "cannot poll for write readiness"); poll_ready!( self, mask, read_readiness, take_read_ready, - self.inner.registration.poll_read_ready() + self.inner.registration.poll_read_ready(cx) ) } @@ -243,7 +252,7 @@ where /// /// * `ready` includes writable or HUP /// * called from outside of a task context. - pub fn clear_read_ready(&self, ready: mio::Ready) -> io::Result<()> { + pub fn clear_read_ready(&self, cx: &mut Context<'_>, ready: mio::Ready) -> io::Result<()> { // Cannot clear write readiness assert!(!ready.is_writable(), "cannot clear write readiness"); assert!( @@ -255,9 +264,9 @@ where .read_readiness .fetch_and(!ready.as_usize(), Relaxed); - if self.poll_read_ready(ready)?.is_ready() { + if self.poll_read_ready(cx, ready)?.is_ready() { // Notify the current task - task::current().notify(); + cx.waker().wake_by_ref(); } Ok(()) @@ -282,13 +291,13 @@ where /// /// * `ready` contains bits besides `writable` and `hup`. /// * called from outside of a task context. - pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> { + pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> { poll_ready!( self, mio::Ready::writable(), write_readiness, take_write_ready, - self.inner.registration.poll_write_ready() + self.inner.registration.poll_write_ready(cx) ) } @@ -304,16 +313,16 @@ where /// # Panics /// /// This function will panic if called from outside of a task context. - pub fn clear_write_ready(&self) -> io::Result<()> { + pub fn clear_write_ready(&self, cx: &mut Context<'_>) -> io::Result<()> { let ready = mio::Ready::writable(); self.inner .write_readiness .fetch_and(!ready.as_usize(), Relaxed); - if self.poll_write_ready()?.is_ready() { + if self.poll_write_ready(cx)?.is_ready() { // Notify the current task - task::current().notify(); + cx.waker().wake_by_ref(); } Ok(()) @@ -330,139 +339,64 @@ where // ===== Read / Write impls ===== -impl<E> Read for PollEvented<E> +impl<E> AsyncRead for PollEvented<E> where - E: Evented + Read, + E: Evented + Read + Unpin, { - fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { - if let Async::NotReady = self.poll_read_ready(mio::Ready::readable())? { - return Err(io::ErrorKind::WouldBlock.into()); - } - - let r = self.get_mut().read(buf); - - if is_wouldblock(&r) { - self.clear_read_ready(mio::Ready::readable())?; - } - - return r; - } -} - -impl<E> Write for PollEvented<E> -where - E: Evented + Write, -{ - fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - if let Async::NotReady = self.poll_write_ready()? { - return Err(io::ErrorKind::WouldBlock.into()); - } - - let r = self.get_mut().write(buf); + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + ready!(self.poll_read_ready(cx, mio::Ready::readable()))?; - if is_wouldblock(&r) { - self.clear_write_ready()?; - } - - return r; - } - - fn flush(&mut self) -> io::Result<()> { - if let Async::NotReady = self.poll_write_ready()? { - return Err(io::ErrorKind::WouldBlock.into()); - } - - let r = self.get_mut().flush(); + let r = (*self).get_mut().read(buf); if is_wouldblock(&r) { - self.clear_write_ready()?; + self.clear_read_ready(cx, mio::Ready::readable())?; + return Poll::Pending; } - return r; + Poll::Ready(r) } } -impl<E> AsyncRead for PollEvented<E> where E: Evented + Read {} - impl<E> AsyncWrite for PollEvented<E> where - E: Evented + Write, + E: Evented + Write + Unpin, { - fn shutdown(&mut self) -> Poll<(), io::Error> { - Ok(().into()) - } -} + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + ready!(self.poll_write_ready(cx))?; -// ===== &'a Read / &'a Write impls ===== - -impl<'a, E> Read for &'a PollEvented<E> -where - E: Evented, - &'a E: Read, -{ - fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { - if let Async::NotReady = self.poll_read_ready(mio::Ready::readable())? { - return Err(io::ErrorKind::WouldBlock.into()); - } - - let r = self.get_ref().read(buf); + let r = (*self).get_mut().write(buf); if is_wouldblock(&r) { - self.clear_read_ready(mio::Ready::readable())?; + self.clear_write_ready(cx)?; + return Poll::Pending; } - return r; + Poll::Ready(r) } -} -impl<'a, E> Write for &'a PollEvented<E> -where - E: Evented, - &'a E: Write, -{ - fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - if let Async::NotReady = self.poll_write_ready()? { - return Err(io::ErrorKind::WouldBlock.into()); - } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + ready!(self.poll_write_ready(cx))?; - let r = self.get_ref().write(buf); + let r = (*self).get_mut().flush(); if is_wouldblock(&r) { - self.clear_write_ready()?; + self.clear_write_ready(cx)?; + return Poll::Pending; } - return r; + Poll::Ready(r) } - fn flush(&mut self) -> io::Result<()> { - if let Async::NotReady = self.poll_write_ready()? { - return Err(io::ErrorKind::WouldBlock.into()); - } - - let r = self.get_ref().flush(); - - if is_wouldblock(&r) { - self.clear_write_ready()?; - } - - return r; - } -} - -impl<'a, E> AsyncRead for &'a PollEvented<E> -where - E: Evented, - &'a E: Read, -{ -} - -impl<'a, E> AsyncWrite for &'a PollEvented<E> -where - E: Evented, - &'a E: Write, -{ - fn shutdown(&mut self) -> Poll<(), io::Error> { - Ok(().into()) + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Poll::Ready(Ok(())) } } |