diff options
author | Carl Lerche <me@carllerche.com> | 2018-03-02 15:15:05 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-03-02 15:15:05 -0800 |
commit | 21c0f3a9d814e37222a70b18c55921a2395804ee (patch) | |
tree | 6d513defb4ff76320983dcb2b2f2ea13f7fbe1a5 | |
parent | e1b30851537bccef87c37bd306a2661418083bc1 (diff) |
Add AsyncRead::poll_read, AsyncWrite::poll_write. (#170)
This removes the need for the `try_nb` macro as well as bring the traits
closer in line with the planed 0.2 iteration.
-rw-r--r-- | tokio-io/src/async_read.rs | 20 | ||||
-rw-r--r-- | tokio-io/src/async_write.rs | 42 | ||||
-rw-r--r-- | tokio-io/src/framed_write.rs | 4 | ||||
-rw-r--r-- | tokio-io/src/io/copy.rs | 6 | ||||
-rw-r--r-- | tokio-io/src/io/flush.rs | 2 | ||||
-rw-r--r-- | tokio-io/src/io/read.rs | 2 | ||||
-rw-r--r-- | tokio-io/src/io/read_exact.rs | 2 | ||||
-rw-r--r-- | tokio-io/src/io/write_all.rs | 2 | ||||
-rw-r--r-- | tokio-io/src/length_delimited.rs | 2 |
9 files changed, 68 insertions, 14 deletions
diff --git a/tokio-io/src/async_read.rs b/tokio-io/src/async_read.rs index 08d851ef..804aca7d 100644 --- a/tokio-io/src/async_read.rs +++ b/tokio-io/src/async_read.rs @@ -67,6 +67,24 @@ pub trait AsyncRead: std_io::Read { true } + /// Attempt to read from the `AsyncRead` into `buf`. + /// + /// On success, returns `Ok(Async::Ready(num_bytes_read))`. + /// + /// If no data is available for reading, the method returns + /// `Ok(Async::Pending)` and arranges for the current task (via + /// `cx.waker()`) to receive a notification when the object becomes + /// readable or is closed. + fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, std_io::Error> { + match self.read(buf) { + Ok(t) => Ok(Async::Ready(t)), + Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => { + return Ok(Async::NotReady) + } + Err(e) => return Err(e.into()), + } + } + /// Pull some bytes from this source into the specified `Buf`, returning /// how many bytes were read. /// @@ -86,7 +104,7 @@ pub trait AsyncRead: std_io::Read { self.prepare_uninitialized_buffer(b); - try_nb!(self.read(b)) + try_ready!(self.poll_read(b)) }; buf.advance_mut(n); diff --git a/tokio-io/src/async_write.rs b/tokio-io/src/async_write.rs index 8c27ee12..514a8ec1 100644 --- a/tokio-io/src/async_write.rs +++ b/tokio-io/src/async_write.rs @@ -1,5 +1,4 @@ use std::io as std_io; -use std::io::Write; use bytes::Buf; use futures::{Async, Poll}; @@ -35,6 +34,43 @@ use AsyncRead; /// current task is ready to receive a notification when flushing can make more /// progress, and otherwise normal errors can happen as well. pub trait AsyncWrite: std_io::Write { + /// Attempt to write bytes from `buf` into the object. + /// + /// On success, returns `Ok(Async::Ready(num_bytes_written))`. + /// + /// If the object is not ready for writing, the method returns + /// `Ok(Async::Pending)` and arranges for the current task (via + /// `cx.waker()`) to receive a notification when the object becomes + /// readable or is closed. + fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, std_io::Error> { + match self.write(buf) { + Ok(t) => Ok(Async::Ready(t)), + Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => { + return Ok(Async::NotReady) + } + Err(e) => return Err(e.into()), + } + } + + /// Attempt to flush the object, ensuring that any buffered data reach + /// their destination. + /// + /// On success, returns `Ok(Async::Ready(()))`. + /// + /// If flushing cannot immediately complete, this method returns + /// `Ok(Async::Pending)` and arranges for the current task (via + /// `cx.waker()`) to receive a notification when the object can make + /// progress towards flushing. + fn poll_flush(&mut self) -> Poll<(), std_io::Error> { + match self.flush() { + Ok(t) => Ok(Async::Ready(t)), + Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => { + return Ok(Async::NotReady) + } + Err(e) => return Err(e.into()), + } + } + /// Initiates or attempts to shut down this writer, returning success when /// the I/O connection has completely shut down. /// @@ -106,7 +142,7 @@ pub trait AsyncWrite: std_io::Write { return Ok(Async::Ready(0)); } - let n = try_nb!(self.write(buf.bytes())); + let n = try_ready!(self.poll_write(buf.bytes())); buf.advance(n); Ok(Async::Ready(n)) } @@ -150,7 +186,7 @@ impl<T, U> AsyncRead for std_io::Chain<T, U> impl<T: AsyncWrite> AsyncWrite for std_io::BufWriter<T> { fn shutdown(&mut self) -> Poll<(), std_io::Error> { - try_nb!(self.flush()); + try_ready!(self.poll_flush()); self.get_mut().shutdown() } } diff --git a/tokio-io/src/framed_write.rs b/tokio-io/src/framed_write.rs index e16ad3f8..8c60426c 100644 --- a/tokio-io/src/framed_write.rs +++ b/tokio-io/src/framed_write.rs @@ -184,7 +184,7 @@ impl<T> Sink for FramedWrite2<T> while !self.buffer.is_empty() { trace!("writing; remaining={}", self.buffer.len()); - let n = try_nb!(self.inner.write(&self.buffer)); + let n = try_ready!(self.inner.poll_write(&self.buffer)); if n == 0 { return Err(io::Error::new(io::ErrorKind::WriteZero, "failed to @@ -197,7 +197,7 @@ impl<T> Sink for FramedWrite2<T> } // Try flushing the underlying IO - try_nb!(self.inner.flush()); + try_ready!(self.inner.poll_flush()); trace!("framed transport flushed"); return Ok(Async::Ready(())); diff --git a/tokio-io/src/io/copy.rs b/tokio-io/src/io/copy.rs index 8b8c0fe4..b21dc1c4 100644 --- a/tokio-io/src/io/copy.rs +++ b/tokio-io/src/io/copy.rs @@ -60,7 +60,7 @@ impl<R, W> Future for Copy<R, W> // continue. if self.pos == self.cap && !self.read_done { let reader = self.reader.as_mut().unwrap(); - let n = try_nb!(reader.read(&mut self.buf)); + let n = try_ready!(reader.poll_read(&mut self.buf)); if n == 0 { self.read_done = true; } else { @@ -72,7 +72,7 @@ impl<R, W> Future for Copy<R, W> // If our buffer has some data, let's write it out! while self.pos < self.cap { let writer = self.writer.as_mut().unwrap(); - let i = try_nb!(writer.write(&self.buf[self.pos..self.cap])); + let i = try_ready!(writer.poll_write(&self.buf[self.pos..self.cap])); if i == 0 { return Err(io::Error::new(io::ErrorKind::WriteZero, "write zero byte into writer")); @@ -86,7 +86,7 @@ impl<R, W> Future for Copy<R, W> // data and finish the transfer. // done with the entire transfer. if self.pos == self.cap && self.read_done { - try_nb!(self.writer.as_mut().unwrap().flush()); + try_ready!(self.writer.as_mut().unwrap().poll_flush()); let reader = self.reader.take().unwrap(); let writer = self.writer.take().unwrap(); return Ok((self.amt, reader, writer).into()) diff --git a/tokio-io/src/io/flush.rs b/tokio-io/src/io/flush.rs index 29065f9f..dabdc6c9 100644 --- a/tokio-io/src/io/flush.rs +++ b/tokio-io/src/io/flush.rs @@ -37,7 +37,7 @@ impl<A> Future for Flush<A> type Error = io::Error; fn poll(&mut self) -> Poll<A, io::Error> { - try_nb!(self.a.as_mut().unwrap().flush()); + try_ready!(self.a.as_mut().unwrap().poll_flush()); Ok(Async::Ready(self.a.take().unwrap())) } } diff --git a/tokio-io/src/io/read.rs b/tokio-io/src/io/read.rs index abfb459c..4c5a9665 100644 --- a/tokio-io/src/io/read.rs +++ b/tokio-io/src/io/read.rs @@ -44,7 +44,7 @@ impl<R, T> Future for Read<R, T> fn poll(&mut self) -> Poll<(R, T, usize), io::Error> { let nread = match self.state { - State::Pending { ref mut rd, ref mut buf } => try_nb!(rd.read(&mut buf.as_mut()[..])), + State::Pending { ref mut rd, ref mut buf } => try_ready!(rd.poll_read(&mut buf.as_mut()[..])), State::Empty => panic!("poll a Read after it's done"), }; diff --git a/tokio-io/src/io/read_exact.rs b/tokio-io/src/io/read_exact.rs index 14251242..b1e16440 100644 --- a/tokio-io/src/io/read_exact.rs +++ b/tokio-io/src/io/read_exact.rs @@ -65,7 +65,7 @@ impl<A, T> Future for ReadExact<A, T> State::Reading { ref mut a, ref mut buf, ref mut pos } => { let buf = buf.as_mut(); while *pos < buf.len() { - let n = try_nb!(a.read(&mut buf[*pos..])); + let n = try_ready!(a.poll_read(&mut buf[*pos..])); *pos += n; if n == 0 { return Err(eof()) diff --git a/tokio-io/src/io/write_all.rs b/tokio-io/src/io/write_all.rs index d000f1b9..50b11fbc 100644 --- a/tokio-io/src/io/write_all.rs +++ b/tokio-io/src/io/write_all.rs @@ -68,7 +68,7 @@ impl<A, T> Future for WriteAll<A, T> State::Writing { ref mut a, ref buf, ref mut pos } => { let buf = buf.as_ref(); while *pos < buf.len() { - let n = try_nb!(a.write(&buf[*pos..])); + let n = try_ready!(a.poll_write(&buf[*pos..])); *pos += n; if n == 0 { return Err(zero_write()) diff --git a/tokio-io/src/length_delimited.rs b/tokio-io/src/length_delimited.rs index 7de61a35..7e6635d1 100644 --- a/tokio-io/src/length_delimited.rs +++ b/tokio-io/src/length_delimited.rs @@ -511,7 +511,7 @@ impl<T: AsyncWrite, B: IntoBuf> Sink for FramedWrite<T, B> { try_ready!(self.do_write()); // Try flushing the underlying IO - try_nb!(self.inner.flush()); + try_ready!(self.inner.poll_flush()); return Ok(Async::Ready(())); } |