diff options
Diffstat (limited to 'tokio-tcp/src/stream.rs')
-rw-r--r-- | tokio-tcp/src/stream.rs | 175 |
1 files changed, 55 insertions, 120 deletions
diff --git a/tokio-tcp/src/stream.rs b/tokio-tcp/src/stream.rs index 1a00679d..46f17837 100644 --- a/tokio-tcp/src/stream.rs +++ b/tokio-tcp/src/stream.rs @@ -1,11 +1,13 @@ use bytes::{Buf, BufMut}; -use futures::{try_ready, Async, Future, Poll}; use iovec::IoVec; use mio; use std::fmt; -use std::io::{self, Read, Write}; +use std::future::Future; +use std::io; use std::mem; use std::net::{self, Shutdown, SocketAddr}; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::time::Duration; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_reactor::{Handle, PollEvented}; @@ -42,13 +44,10 @@ pub struct TcpStream { /// Future returned by `TcpStream::connect` which will resolve to a `TcpStream` /// when the stream is connected. #[must_use = "futures do nothing unless polled"] -#[derive(Debug)] -pub struct ConnectFuture { +struct ConnectFuture { inner: ConnectFutureState, } -#[must_use = "futures do nothing unless polled"] -#[derive(Debug)] enum ConnectFutureState { Waiting(TcpStream), Error(io::Error), @@ -76,7 +75,7 @@ impl TcpStream { /// println!("successfully connected to {}", stream.local_addr().unwrap())); /// # Ok::<_, Box<dyn std::error::Error>>(()) /// ``` - pub fn connect(addr: &SocketAddr) -> ConnectFuture { + pub fn connect(addr: &SocketAddr) -> impl Future<Output = io::Result<TcpStream>> { use self::ConnectFutureState::*; let inner = match mio::net::TcpStream::connect(addr) { @@ -138,7 +137,7 @@ impl TcpStream { stream: net::TcpStream, addr: &SocketAddr, handle: &Handle, - ) -> ConnectFuture { + ) -> impl Future<Output = io::Result<TcpStream>> { use self::ConnectFutureState::*; let io = mio::net::TcpStream::connect_stream(stream, addr) @@ -193,8 +192,8 @@ impl TcpStream { /// }); /// # Ok::<_, Box<dyn std::error::Error>>(()) /// ``` - pub fn poll_read_ready(&self, mask: mio::Ready) -> Poll<mio::Ready, io::Error> { - self.io.poll_read_ready(mask) + pub fn poll_read_ready(&self, cx: &mut Context<'_>, mask: mio::Ready) -> Poll<io::Result<mio::Ready>> { + self.io.poll_read_ready(cx, mask) } /// Check the TCP stream's write readiness state. @@ -232,8 +231,8 @@ impl TcpStream { /// }); /// # Ok::<_, Box<dyn std::error::Error>>(()) /// ``` - pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> { - self.io.poll_write_ready() + pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> { + self.io.poll_write_ready(cx) } /// Returns the local address that this stream is bound to. @@ -279,15 +278,6 @@ impl TcpStream { self.io.get_ref().peer_addr() } - #[deprecated(since = "0.1.2", note = "use poll_peek instead")] - #[doc(hidden)] - pub fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> { - match self.poll_peek(buf)? { - Async::Ready(n) => Ok(n), - Async::NotReady => Err(io::ErrorKind::WouldBlock.into()), - } - } - /// Receives data on the socket from the remote address to which it is /// connected, without removing that data from the queue. On success, /// returns the number of bytes peeked. @@ -328,16 +318,16 @@ impl TcpStream { /// }); /// # Ok::<_, Box<dyn std::error::Error>>(()) /// ``` - pub fn poll_peek(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> { - try_ready!(self.io.poll_read_ready(mio::Ready::readable())); + pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> { + ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; match self.io.get_ref().peek(buf) { - Ok(ret) => Ok(ret.into()), + Ok(ret) => Poll::Ready(Ok(ret)), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(mio::Ready::readable())?; - Ok(Async::NotReady) + self.io.clear_read_ready(cx, mio::Ready::readable())?; + Poll::Pending } - Err(e) => Err(e), + Err(e) => Poll::Ready(Err(e)), } } @@ -721,68 +711,17 @@ impl TcpStream { // ===== impl Read / Write ===== -impl Read for TcpStream { - fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { - self.io.read(buf) - } -} - -impl Write for TcpStream { - fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - self.io.write(buf) - } - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} - impl AsyncRead for TcpStream { unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { false } - fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { - <&TcpStream>::read_buf(&mut &*self, buf) - } -} - -impl AsyncWrite for TcpStream { - fn shutdown(&mut self) -> Poll<(), io::Error> { - <&TcpStream>::shutdown(&mut &*self) - } - - fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { - <&TcpStream>::write_buf(&mut &*self, buf) - } -} - -// ===== impl Read / Write for &'a ===== - -impl<'a> Read for &'a TcpStream { - fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { - (&self.io).read(buf) - } -} - -impl<'a> Write for &'a TcpStream { - fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - (&self.io).write(buf) - } - - fn flush(&mut self) -> io::Result<()> { - (&self.io).flush() + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> { + Pin::new(&mut self.io).poll_read(cx, buf) } -} -impl<'a> AsyncRead for &'a TcpStream { - unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { - false - } - - fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { - if let Async::NotReady = self.io.poll_read_ready(mio::Ready::readable())? { - return Ok(Async::NotReady); - } + fn poll_read_buf<B: BufMut>(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut B) -> Poll<io::Result<usize>> { + ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; let r = unsafe { // The `IoVec` type can't have a 0-length size, so we create a bunch @@ -831,26 +770,34 @@ impl<'a> AsyncRead for &'a TcpStream { unsafe { buf.advance_mut(n); } - Ok(Async::Ready(n)) + Poll::Ready(Ok(n)) } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(mio::Ready::readable())?; - Ok(Async::NotReady) + self.io.clear_read_ready(cx, mio::Ready::readable())?; + Poll::Pending } - Err(e) => Err(e), + Err(e) => Poll::Ready(Err(e)), } } } -impl<'a> AsyncWrite for &'a TcpStream { - fn shutdown(&mut self) -> Poll<(), io::Error> { - Ok(().into()) +impl AsyncWrite for TcpStream { + fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> { + Pin::new(&mut self.io).poll_write(cx, buf) } - fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { - if let Async::NotReady = self.io.poll_write_ready()? { - return Ok(Async::NotReady); - } + #[inline] + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + // tcp flush is a no-op + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + Poll::Ready(Ok(())) + } + + fn poll_write_buf<B: Buf>(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut B) -> Poll<io::Result<usize>> { + ready!(self.io.poll_write_ready(cx))?; let r = { // The `IoVec` type can't have a zero-length size, so create a dummy @@ -865,13 +812,13 @@ impl<'a> AsyncWrite for &'a TcpStream { match r { Ok(n) => { buf.advance(n); - Ok(Async::Ready(n)) + Poll::Ready(Ok(n)) } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready()?; - Ok(Async::NotReady) + self.io.clear_write_ready(cx)?; + Poll::Pending } - Err(e) => Err(e), + Err(e) => Poll::Ready(Err(e)), } } } @@ -883,18 +830,17 @@ impl fmt::Debug for TcpStream { } impl Future for ConnectFuture { - type Item = TcpStream; - type Error = io::Error; + type Output = io::Result<TcpStream>; - fn poll(&mut self) -> Poll<TcpStream, io::Error> { - self.inner.poll() + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<TcpStream>> { + self.inner.poll_inner(|io| io.poll_write_ready(cx)) } } impl ConnectFutureState { - fn poll_inner<F>(&mut self, f: F) -> Poll<TcpStream, io::Error> + fn poll_inner<F>(&mut self, f: F) -> Poll<io::Result<TcpStream>> where - F: FnOnce(&mut PollEvented<mio::net::TcpStream>) -> Poll<mio::Ready, io::Error>, + F: FnOnce(&mut PollEvented<mio::net::TcpStream>) -> Poll<io::Result<mio::Ready>>, { { let stream = match *self { @@ -902,9 +848,9 @@ impl ConnectFutureState { ConnectFutureState::Error(_) => { let e = match mem::replace(self, ConnectFutureState::Empty) { ConnectFutureState::Error(e) => e, - _ => panic!(), + _ => unreachable!(), }; - return Err(e); + return Poll::Ready(Err(e)); } ConnectFutureState::Empty => panic!("can't poll TCP stream twice"), }; @@ -915,31 +861,20 @@ impl ConnectFutureState { // actually hit an error or not. // // If all that succeeded then we ship everything on up. - if let Async::NotReady = f(&mut stream.io)? { - return Ok(Async::NotReady); - } + ready!(f(&mut stream.io))?; if let Some(e) = stream.io.get_ref().take_error()? { - return Err(e); + return Poll::Ready(Err(e)); } } match mem::replace(self, ConnectFutureState::Empty) { - ConnectFutureState::Waiting(stream) => Ok(Async::Ready(stream)), - _ => panic!(), + ConnectFutureState::Waiting(stream) => Poll::Ready(Ok(stream)), + _ => unreachable!(), } } } -impl Future for ConnectFutureState { - type Item = TcpStream; - type Error = io::Error; - - fn poll(&mut self) -> Poll<TcpStream, io::Error> { - self.poll_inner(|io| io.poll_write_ready()) - } -} - #[cfg(unix)] mod sys { use super::TcpStream; |