summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2018-03-02 15:15:05 -0800
committerGitHub <noreply@github.com>2018-03-02 15:15:05 -0800
commit21c0f3a9d814e37222a70b18c55921a2395804ee (patch)
tree6d513defb4ff76320983dcb2b2f2ea13f7fbe1a5
parente1b30851537bccef87c37bd306a2661418083bc1 (diff)
Add AsyncRead::poll_read, AsyncWrite::poll_write. (#170)
This removes the need for the `try_nb` macro as well as bring the traits closer in line with the planed 0.2 iteration.
-rw-r--r--tokio-io/src/async_read.rs20
-rw-r--r--tokio-io/src/async_write.rs42
-rw-r--r--tokio-io/src/framed_write.rs4
-rw-r--r--tokio-io/src/io/copy.rs6
-rw-r--r--tokio-io/src/io/flush.rs2
-rw-r--r--tokio-io/src/io/read.rs2
-rw-r--r--tokio-io/src/io/read_exact.rs2
-rw-r--r--tokio-io/src/io/write_all.rs2
-rw-r--r--tokio-io/src/length_delimited.rs2
9 files changed, 68 insertions, 14 deletions
diff --git a/tokio-io/src/async_read.rs b/tokio-io/src/async_read.rs
index 08d851ef..804aca7d 100644
--- a/tokio-io/src/async_read.rs
+++ b/tokio-io/src/async_read.rs
@@ -67,6 +67,24 @@ pub trait AsyncRead: std_io::Read {
true
}
+ /// Attempt to read from the `AsyncRead` into `buf`.
+ ///
+ /// On success, returns `Ok(Async::Ready(num_bytes_read))`.
+ ///
+ /// If no data is available for reading, the method returns
+ /// `Ok(Async::Pending)` and arranges for the current task (via
+ /// `cx.waker()`) to receive a notification when the object becomes
+ /// readable or is closed.
+ fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, std_io::Error> {
+ match self.read(buf) {
+ Ok(t) => Ok(Async::Ready(t)),
+ Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => {
+ return Ok(Async::NotReady)
+ }
+ Err(e) => return Err(e.into()),
+ }
+ }
+
/// Pull some bytes from this source into the specified `Buf`, returning
/// how many bytes were read.
///
@@ -86,7 +104,7 @@ pub trait AsyncRead: std_io::Read {
self.prepare_uninitialized_buffer(b);
- try_nb!(self.read(b))
+ try_ready!(self.poll_read(b))
};
buf.advance_mut(n);
diff --git a/tokio-io/src/async_write.rs b/tokio-io/src/async_write.rs
index 8c27ee12..514a8ec1 100644
--- a/tokio-io/src/async_write.rs
+++ b/tokio-io/src/async_write.rs
@@ -1,5 +1,4 @@
use std::io as std_io;
-use std::io::Write;
use bytes::Buf;
use futures::{Async, Poll};
@@ -35,6 +34,43 @@ use AsyncRead;
/// current task is ready to receive a notification when flushing can make more
/// progress, and otherwise normal errors can happen as well.
pub trait AsyncWrite: std_io::Write {
+ /// Attempt to write bytes from `buf` into the object.
+ ///
+ /// On success, returns `Ok(Async::Ready(num_bytes_written))`.
+ ///
+ /// If the object is not ready for writing, the method returns
+ /// `Ok(Async::Pending)` and arranges for the current task (via
+ /// `cx.waker()`) to receive a notification when the object becomes
+ /// readable or is closed.
+ fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, std_io::Error> {
+ match self.write(buf) {
+ Ok(t) => Ok(Async::Ready(t)),
+ Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => {
+ return Ok(Async::NotReady)
+ }
+ Err(e) => return Err(e.into()),
+ }
+ }
+
+ /// Attempt to flush the object, ensuring that any buffered data reach
+ /// their destination.
+ ///
+ /// On success, returns `Ok(Async::Ready(()))`.
+ ///
+ /// If flushing cannot immediately complete, this method returns
+ /// `Ok(Async::Pending)` and arranges for the current task (via
+ /// `cx.waker()`) to receive a notification when the object can make
+ /// progress towards flushing.
+ fn poll_flush(&mut self) -> Poll<(), std_io::Error> {
+ match self.flush() {
+ Ok(t) => Ok(Async::Ready(t)),
+ Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => {
+ return Ok(Async::NotReady)
+ }
+ Err(e) => return Err(e.into()),
+ }
+ }
+
/// Initiates or attempts to shut down this writer, returning success when
/// the I/O connection has completely shut down.
///
@@ -106,7 +142,7 @@ pub trait AsyncWrite: std_io::Write {
return Ok(Async::Ready(0));
}
- let n = try_nb!(self.write(buf.bytes()));
+ let n = try_ready!(self.poll_write(buf.bytes()));
buf.advance(n);
Ok(Async::Ready(n))
}
@@ -150,7 +186,7 @@ impl<T, U> AsyncRead for std_io::Chain<T, U>
impl<T: AsyncWrite> AsyncWrite for std_io::BufWriter<T> {
fn shutdown(&mut self) -> Poll<(), std_io::Error> {
- try_nb!(self.flush());
+ try_ready!(self.poll_flush());
self.get_mut().shutdown()
}
}
diff --git a/tokio-io/src/framed_write.rs b/tokio-io/src/framed_write.rs
index e16ad3f8..8c60426c 100644
--- a/tokio-io/src/framed_write.rs
+++ b/tokio-io/src/framed_write.rs
@@ -184,7 +184,7 @@ impl<T> Sink for FramedWrite2<T>
while !self.buffer.is_empty() {
trace!("writing; remaining={}", self.buffer.len());
- let n = try_nb!(self.inner.write(&self.buffer));
+ let n = try_ready!(self.inner.poll_write(&self.buffer));
if n == 0 {
return Err(io::Error::new(io::ErrorKind::WriteZero, "failed to
@@ -197,7 +197,7 @@ impl<T> Sink for FramedWrite2<T>
}
// Try flushing the underlying IO
- try_nb!(self.inner.flush());
+ try_ready!(self.inner.poll_flush());
trace!("framed transport flushed");
return Ok(Async::Ready(()));
diff --git a/tokio-io/src/io/copy.rs b/tokio-io/src/io/copy.rs
index 8b8c0fe4..b21dc1c4 100644
--- a/tokio-io/src/io/copy.rs
+++ b/tokio-io/src/io/copy.rs
@@ -60,7 +60,7 @@ impl<R, W> Future for Copy<R, W>
// continue.
if self.pos == self.cap && !self.read_done {
let reader = self.reader.as_mut().unwrap();
- let n = try_nb!(reader.read(&mut self.buf));
+ let n = try_ready!(reader.poll_read(&mut self.buf));
if n == 0 {
self.read_done = true;
} else {
@@ -72,7 +72,7 @@ impl<R, W> Future for Copy<R, W>
// If our buffer has some data, let's write it out!
while self.pos < self.cap {
let writer = self.writer.as_mut().unwrap();
- let i = try_nb!(writer.write(&self.buf[self.pos..self.cap]));
+ let i = try_ready!(writer.poll_write(&self.buf[self.pos..self.cap]));
if i == 0 {
return Err(io::Error::new(io::ErrorKind::WriteZero,
"write zero byte into writer"));
@@ -86,7 +86,7 @@ impl<R, W> Future for Copy<R, W>
// data and finish the transfer.
// done with the entire transfer.
if self.pos == self.cap && self.read_done {
- try_nb!(self.writer.as_mut().unwrap().flush());
+ try_ready!(self.writer.as_mut().unwrap().poll_flush());
let reader = self.reader.take().unwrap();
let writer = self.writer.take().unwrap();
return Ok((self.amt, reader, writer).into())
diff --git a/tokio-io/src/io/flush.rs b/tokio-io/src/io/flush.rs
index 29065f9f..dabdc6c9 100644
--- a/tokio-io/src/io/flush.rs
+++ b/tokio-io/src/io/flush.rs
@@ -37,7 +37,7 @@ impl<A> Future for Flush<A>
type Error = io::Error;
fn poll(&mut self) -> Poll<A, io::Error> {
- try_nb!(self.a.as_mut().unwrap().flush());
+ try_ready!(self.a.as_mut().unwrap().poll_flush());
Ok(Async::Ready(self.a.take().unwrap()))
}
}
diff --git a/tokio-io/src/io/read.rs b/tokio-io/src/io/read.rs
index abfb459c..4c5a9665 100644
--- a/tokio-io/src/io/read.rs
+++ b/tokio-io/src/io/read.rs
@@ -44,7 +44,7 @@ impl<R, T> Future for Read<R, T>
fn poll(&mut self) -> Poll<(R, T, usize), io::Error> {
let nread = match self.state {
- State::Pending { ref mut rd, ref mut buf } => try_nb!(rd.read(&mut buf.as_mut()[..])),
+ State::Pending { ref mut rd, ref mut buf } => try_ready!(rd.poll_read(&mut buf.as_mut()[..])),
State::Empty => panic!("poll a Read after it's done"),
};
diff --git a/tokio-io/src/io/read_exact.rs b/tokio-io/src/io/read_exact.rs
index 14251242..b1e16440 100644
--- a/tokio-io/src/io/read_exact.rs
+++ b/tokio-io/src/io/read_exact.rs
@@ -65,7 +65,7 @@ impl<A, T> Future for ReadExact<A, T>
State::Reading { ref mut a, ref mut buf, ref mut pos } => {
let buf = buf.as_mut();
while *pos < buf.len() {
- let n = try_nb!(a.read(&mut buf[*pos..]));
+ let n = try_ready!(a.poll_read(&mut buf[*pos..]));
*pos += n;
if n == 0 {
return Err(eof())
diff --git a/tokio-io/src/io/write_all.rs b/tokio-io/src/io/write_all.rs
index d000f1b9..50b11fbc 100644
--- a/tokio-io/src/io/write_all.rs
+++ b/tokio-io/src/io/write_all.rs
@@ -68,7 +68,7 @@ impl<A, T> Future for WriteAll<A, T>
State::Writing { ref mut a, ref buf, ref mut pos } => {
let buf = buf.as_ref();
while *pos < buf.len() {
- let n = try_nb!(a.write(&buf[*pos..]));
+ let n = try_ready!(a.poll_write(&buf[*pos..]));
*pos += n;
if n == 0 {
return Err(zero_write())
diff --git a/tokio-io/src/length_delimited.rs b/tokio-io/src/length_delimited.rs
index 7de61a35..7e6635d1 100644
--- a/tokio-io/src/length_delimited.rs
+++ b/tokio-io/src/length_delimited.rs
@@ -511,7 +511,7 @@ impl<T: AsyncWrite, B: IntoBuf> Sink for FramedWrite<T, B> {
try_ready!(self.do_write());
// Try flushing the underlying IO
- try_nb!(self.inner.flush());
+ try_ready!(self.inner.poll_flush());
return Ok(Async::Ready(()));
}