summaryrefslogtreecommitdiffstats
path: root/tokio-reactor/src/poll_evented.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio-reactor/src/poll_evented.rs')
-rw-r--r--tokio-reactor/src/poll_evented.rs174
1 files changed, 54 insertions, 120 deletions
diff --git a/tokio-reactor/src/poll_evented.rs b/tokio-reactor/src/poll_evented.rs
index 140ac4ca..5b280294 100644
--- a/tokio-reactor/src/poll_evented.rs
+++ b/tokio-reactor/src/poll_evented.rs
@@ -1,11 +1,13 @@
use crate::{Handle, Registration};
-use futures::{task, try_ready, Async, Poll};
use mio;
use mio::event::Evented;
use std::fmt;
use std::io::{self, Read, Write};
+use std::marker::Unpin;
+use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
+use std::task::{Context, Poll};
use tokio_io::{AsyncRead, AsyncWrite};
/// Associates an I/O resource that implements the [`std::io::Read`] and/or
@@ -116,7 +118,10 @@ macro_rules! poll_ready {
// stream. This happens in a loop to ensure that the stream gets
// drained.
loop {
- let ready = try_ready!($poll);
+ let ready = match $poll? {
+ Poll::Ready(v) => v,
+ Poll::Pending => return Poll::Pending,
+ };
cached |= ready.as_usize();
// Update the cache store
@@ -125,7 +130,7 @@ macro_rules! poll_ready {
ret |= ready & mask;
if !ret.is_empty() {
- return Ok(ret.into());
+ return Poll::Ready(Ok(ret));
}
}
} else {
@@ -136,7 +141,7 @@ macro_rules! poll_ready {
$me.inner.$cache.store(cached, Relaxed);
}
- Ok(mio::Ready::from_usize(cached).into())
+ Poll::Ready(Ok(mio::Ready::from_usize(cached)))
}
}};
}
@@ -217,14 +222,18 @@ where
///
/// * `ready` includes writable.
/// * called from outside of a task context.
- pub fn poll_read_ready(&self, mask: mio::Ready) -> Poll<mio::Ready, io::Error> {
+ pub fn poll_read_ready(
+ &self,
+ cx: &mut Context<'_>,
+ mask: mio::Ready,
+ ) -> Poll<io::Result<mio::Ready>> {
assert!(!mask.is_writable(), "cannot poll for write readiness");
poll_ready!(
self,
mask,
read_readiness,
take_read_ready,
- self.inner.registration.poll_read_ready()
+ self.inner.registration.poll_read_ready(cx)
)
}
@@ -243,7 +252,7 @@ where
///
/// * `ready` includes writable or HUP
/// * called from outside of a task context.
- pub fn clear_read_ready(&self, ready: mio::Ready) -> io::Result<()> {
+ pub fn clear_read_ready(&self, cx: &mut Context<'_>, ready: mio::Ready) -> io::Result<()> {
// Cannot clear write readiness
assert!(!ready.is_writable(), "cannot clear write readiness");
assert!(
@@ -255,9 +264,9 @@ where
.read_readiness
.fetch_and(!ready.as_usize(), Relaxed);
- if self.poll_read_ready(ready)?.is_ready() {
+ if self.poll_read_ready(cx, ready)?.is_ready() {
// Notify the current task
- task::current().notify();
+ cx.waker().wake_by_ref();
}
Ok(())
@@ -282,13 +291,13 @@ where
///
/// * `ready` contains bits besides `writable` and `hup`.
/// * called from outside of a task context.
- pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> {
+ pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> {
poll_ready!(
self,
mio::Ready::writable(),
write_readiness,
take_write_ready,
- self.inner.registration.poll_write_ready()
+ self.inner.registration.poll_write_ready(cx)
)
}
@@ -304,16 +313,16 @@ where
/// # Panics
///
/// This function will panic if called from outside of a task context.
- pub fn clear_write_ready(&self) -> io::Result<()> {
+ pub fn clear_write_ready(&self, cx: &mut Context<'_>) -> io::Result<()> {
let ready = mio::Ready::writable();
self.inner
.write_readiness
.fetch_and(!ready.as_usize(), Relaxed);
- if self.poll_write_ready()?.is_ready() {
+ if self.poll_write_ready(cx)?.is_ready() {
// Notify the current task
- task::current().notify();
+ cx.waker().wake_by_ref();
}
Ok(())
@@ -330,139 +339,64 @@ where
// ===== Read / Write impls =====
-impl<E> Read for PollEvented<E>
+impl<E> AsyncRead for PollEvented<E>
where
- E: Evented + Read,
+ E: Evented + Read + Unpin,
{
- fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- if let Async::NotReady = self.poll_read_ready(mio::Ready::readable())? {
- return Err(io::ErrorKind::WouldBlock.into());
- }
-
- let r = self.get_mut().read(buf);
-
- if is_wouldblock(&r) {
- self.clear_read_ready(mio::Ready::readable())?;
- }
-
- return r;
- }
-}
-
-impl<E> Write for PollEvented<E>
-where
- E: Evented + Write,
-{
- fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- if let Async::NotReady = self.poll_write_ready()? {
- return Err(io::ErrorKind::WouldBlock.into());
- }
-
- let r = self.get_mut().write(buf);
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ ready!(self.poll_read_ready(cx, mio::Ready::readable()))?;
- if is_wouldblock(&r) {
- self.clear_write_ready()?;
- }
-
- return r;
- }
-
- fn flush(&mut self) -> io::Result<()> {
- if let Async::NotReady = self.poll_write_ready()? {
- return Err(io::ErrorKind::WouldBlock.into());
- }
-
- let r = self.get_mut().flush();
+ let r = (*self).get_mut().read(buf);
if is_wouldblock(&r) {
- self.clear_write_ready()?;
+ self.clear_read_ready(cx, mio::Ready::readable())?;
+ return Poll::Pending;
}
- return r;
+ Poll::Ready(r)
}
}
-impl<E> AsyncRead for PollEvented<E> where E: Evented + Read {}
-
impl<E> AsyncWrite for PollEvented<E>
where
- E: Evented + Write,
+ E: Evented + Write + Unpin,
{
- fn shutdown(&mut self) -> Poll<(), io::Error> {
- Ok(().into())
- }
-}
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ ready!(self.poll_write_ready(cx))?;
-// ===== &'a Read / &'a Write impls =====
-
-impl<'a, E> Read for &'a PollEvented<E>
-where
- E: Evented,
- &'a E: Read,
-{
- fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- if let Async::NotReady = self.poll_read_ready(mio::Ready::readable())? {
- return Err(io::ErrorKind::WouldBlock.into());
- }
-
- let r = self.get_ref().read(buf);
+ let r = (*self).get_mut().write(buf);
if is_wouldblock(&r) {
- self.clear_read_ready(mio::Ready::readable())?;
+ self.clear_write_ready(cx)?;
+ return Poll::Pending;
}
- return r;
+ Poll::Ready(r)
}
-}
-impl<'a, E> Write for &'a PollEvented<E>
-where
- E: Evented,
- &'a E: Write,
-{
- fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- if let Async::NotReady = self.poll_write_ready()? {
- return Err(io::ErrorKind::WouldBlock.into());
- }
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ ready!(self.poll_write_ready(cx))?;
- let r = self.get_ref().write(buf);
+ let r = (*self).get_mut().flush();
if is_wouldblock(&r) {
- self.clear_write_ready()?;
+ self.clear_write_ready(cx)?;
+ return Poll::Pending;
}
- return r;
+ Poll::Ready(r)
}
- fn flush(&mut self) -> io::Result<()> {
- if let Async::NotReady = self.poll_write_ready()? {
- return Err(io::ErrorKind::WouldBlock.into());
- }
-
- let r = self.get_ref().flush();
-
- if is_wouldblock(&r) {
- self.clear_write_ready()?;
- }
-
- return r;
- }
-}
-
-impl<'a, E> AsyncRead for &'a PollEvented<E>
-where
- E: Evented,
- &'a E: Read,
-{
-}
-
-impl<'a, E> AsyncWrite for &'a PollEvented<E>
-where
- E: Evented,
- &'a E: Write,
-{
- fn shutdown(&mut self) -> Poll<(), io::Error> {
- Ok(().into())
+ fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
}
}