summaryrefslogtreecommitdiffstats
path: root/tokio-tcp/src/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio-tcp/src/stream.rs')
-rw-r--r--tokio-tcp/src/stream.rs175
1 files changed, 55 insertions, 120 deletions
diff --git a/tokio-tcp/src/stream.rs b/tokio-tcp/src/stream.rs
index 1a00679d..46f17837 100644
--- a/tokio-tcp/src/stream.rs
+++ b/tokio-tcp/src/stream.rs
@@ -1,11 +1,13 @@
use bytes::{Buf, BufMut};
-use futures::{try_ready, Async, Future, Poll};
use iovec::IoVec;
use mio;
use std::fmt;
-use std::io::{self, Read, Write};
+use std::future::Future;
+use std::io;
use std::mem;
use std::net::{self, Shutdown, SocketAddr};
+use std::pin::Pin;
+use std::task::{Context, Poll};
use std::time::Duration;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_reactor::{Handle, PollEvented};
@@ -42,13 +44,10 @@ pub struct TcpStream {
/// Future returned by `TcpStream::connect` which will resolve to a `TcpStream`
/// when the stream is connected.
#[must_use = "futures do nothing unless polled"]
-#[derive(Debug)]
-pub struct ConnectFuture {
+struct ConnectFuture {
inner: ConnectFutureState,
}
-#[must_use = "futures do nothing unless polled"]
-#[derive(Debug)]
enum ConnectFutureState {
Waiting(TcpStream),
Error(io::Error),
@@ -76,7 +75,7 @@ impl TcpStream {
/// println!("successfully connected to {}", stream.local_addr().unwrap()));
/// # Ok::<_, Box<dyn std::error::Error>>(())
/// ```
- pub fn connect(addr: &SocketAddr) -> ConnectFuture {
+ pub fn connect(addr: &SocketAddr) -> impl Future<Output = io::Result<TcpStream>> {
use self::ConnectFutureState::*;
let inner = match mio::net::TcpStream::connect(addr) {
@@ -138,7 +137,7 @@ impl TcpStream {
stream: net::TcpStream,
addr: &SocketAddr,
handle: &Handle,
- ) -> ConnectFuture {
+ ) -> impl Future<Output = io::Result<TcpStream>> {
use self::ConnectFutureState::*;
let io = mio::net::TcpStream::connect_stream(stream, addr)
@@ -193,8 +192,8 @@ impl TcpStream {
/// });
/// # Ok::<_, Box<dyn std::error::Error>>(())
/// ```
- pub fn poll_read_ready(&self, mask: mio::Ready) -> Poll<mio::Ready, io::Error> {
- self.io.poll_read_ready(mask)
+ pub fn poll_read_ready(&self, cx: &mut Context<'_>, mask: mio::Ready) -> Poll<io::Result<mio::Ready>> {
+ self.io.poll_read_ready(cx, mask)
}
/// Check the TCP stream's write readiness state.
@@ -232,8 +231,8 @@ impl TcpStream {
/// });
/// # Ok::<_, Box<dyn std::error::Error>>(())
/// ```
- pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> {
- self.io.poll_write_ready()
+ pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> {
+ self.io.poll_write_ready(cx)
}
/// Returns the local address that this stream is bound to.
@@ -279,15 +278,6 @@ impl TcpStream {
self.io.get_ref().peer_addr()
}
- #[deprecated(since = "0.1.2", note = "use poll_peek instead")]
- #[doc(hidden)]
- pub fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- match self.poll_peek(buf)? {
- Async::Ready(n) => Ok(n),
- Async::NotReady => Err(io::ErrorKind::WouldBlock.into()),
- }
- }
-
/// Receives data on the socket from the remote address to which it is
/// connected, without removing that data from the queue. On success,
/// returns the number of bytes peeked.
@@ -328,16 +318,16 @@ impl TcpStream {
/// });
/// # Ok::<_, Box<dyn std::error::Error>>(())
/// ```
- pub fn poll_peek(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> {
- try_ready!(self.io.poll_read_ready(mio::Ready::readable()));
+ pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
+ ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
match self.io.get_ref().peek(buf) {
- Ok(ret) => Ok(ret.into()),
+ Ok(ret) => Poll::Ready(Ok(ret)),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(mio::Ready::readable())?;
- Ok(Async::NotReady)
+ self.io.clear_read_ready(cx, mio::Ready::readable())?;
+ Poll::Pending
}
- Err(e) => Err(e),
+ Err(e) => Poll::Ready(Err(e)),
}
}
@@ -721,68 +711,17 @@ impl TcpStream {
// ===== impl Read / Write =====
-impl Read for TcpStream {
- fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- self.io.read(buf)
- }
-}
-
-impl Write for TcpStream {
- fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- self.io.write(buf)
- }
- fn flush(&mut self) -> io::Result<()> {
- Ok(())
- }
-}
-
impl AsyncRead for TcpStream {
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
false
}
- fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
- <&TcpStream>::read_buf(&mut &*self, buf)
- }
-}
-
-impl AsyncWrite for TcpStream {
- fn shutdown(&mut self) -> Poll<(), io::Error> {
- <&TcpStream>::shutdown(&mut &*self)
- }
-
- fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
- <&TcpStream>::write_buf(&mut &*self, buf)
- }
-}
-
-// ===== impl Read / Write for &'a =====
-
-impl<'a> Read for &'a TcpStream {
- fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- (&self.io).read(buf)
- }
-}
-
-impl<'a> Write for &'a TcpStream {
- fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- (&self.io).write(buf)
- }
-
- fn flush(&mut self) -> io::Result<()> {
- (&self.io).flush()
+ fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
+ Pin::new(&mut self.io).poll_read(cx, buf)
}
-}
-impl<'a> AsyncRead for &'a TcpStream {
- unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
- false
- }
-
- fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
- if let Async::NotReady = self.io.poll_read_ready(mio::Ready::readable())? {
- return Ok(Async::NotReady);
- }
+ fn poll_read_buf<B: BufMut>(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut B) -> Poll<io::Result<usize>> {
+ ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
let r = unsafe {
// The `IoVec` type can't have a 0-length size, so we create a bunch
@@ -831,26 +770,34 @@ impl<'a> AsyncRead for &'a TcpStream {
unsafe {
buf.advance_mut(n);
}
- Ok(Async::Ready(n))
+ Poll::Ready(Ok(n))
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(mio::Ready::readable())?;
- Ok(Async::NotReady)
+ self.io.clear_read_ready(cx, mio::Ready::readable())?;
+ Poll::Pending
}
- Err(e) => Err(e),
+ Err(e) => Poll::Ready(Err(e)),
}
}
}
-impl<'a> AsyncWrite for &'a TcpStream {
- fn shutdown(&mut self) -> Poll<(), io::Error> {
- Ok(().into())
+impl AsyncWrite for TcpStream {
+ fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
+ Pin::new(&mut self.io).poll_write(cx, buf)
}
- fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
- if let Async::NotReady = self.io.poll_write_ready()? {
- return Ok(Async::NotReady);
- }
+ #[inline]
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ // tcp flush is a no-op
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_write_buf<B: Buf>(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut B) -> Poll<io::Result<usize>> {
+ ready!(self.io.poll_write_ready(cx))?;
let r = {
// The `IoVec` type can't have a zero-length size, so create a dummy
@@ -865,13 +812,13 @@ impl<'a> AsyncWrite for &'a TcpStream {
match r {
Ok(n) => {
buf.advance(n);
- Ok(Async::Ready(n))
+ Poll::Ready(Ok(n))
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_write_ready()?;
- Ok(Async::NotReady)
+ self.io.clear_write_ready(cx)?;
+ Poll::Pending
}
- Err(e) => Err(e),
+ Err(e) => Poll::Ready(Err(e)),
}
}
}
@@ -883,18 +830,17 @@ impl fmt::Debug for TcpStream {
}
impl Future for ConnectFuture {
- type Item = TcpStream;
- type Error = io::Error;
+ type Output = io::Result<TcpStream>;
- fn poll(&mut self) -> Poll<TcpStream, io::Error> {
- self.inner.poll()
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<TcpStream>> {
+ self.inner.poll_inner(|io| io.poll_write_ready(cx))
}
}
impl ConnectFutureState {
- fn poll_inner<F>(&mut self, f: F) -> Poll<TcpStream, io::Error>
+ fn poll_inner<F>(&mut self, f: F) -> Poll<io::Result<TcpStream>>
where
- F: FnOnce(&mut PollEvented<mio::net::TcpStream>) -> Poll<mio::Ready, io::Error>,
+ F: FnOnce(&mut PollEvented<mio::net::TcpStream>) -> Poll<io::Result<mio::Ready>>,
{
{
let stream = match *self {
@@ -902,9 +848,9 @@ impl ConnectFutureState {
ConnectFutureState::Error(_) => {
let e = match mem::replace(self, ConnectFutureState::Empty) {
ConnectFutureState::Error(e) => e,
- _ => panic!(),
+ _ => unreachable!(),
};
- return Err(e);
+ return Poll::Ready(Err(e));
}
ConnectFutureState::Empty => panic!("can't poll TCP stream twice"),
};
@@ -915,31 +861,20 @@ impl ConnectFutureState {
// actually hit an error or not.
//
// If all that succeeded then we ship everything on up.
- if let Async::NotReady = f(&mut stream.io)? {
- return Ok(Async::NotReady);
- }
+ ready!(f(&mut stream.io))?;
if let Some(e) = stream.io.get_ref().take_error()? {
- return Err(e);
+ return Poll::Ready(Err(e));
}
}
match mem::replace(self, ConnectFutureState::Empty) {
- ConnectFutureState::Waiting(stream) => Ok(Async::Ready(stream)),
- _ => panic!(),
+ ConnectFutureState::Waiting(stream) => Poll::Ready(Ok(stream)),
+ _ => unreachable!(),
}
}
}
-impl Future for ConnectFutureState {
- type Item = TcpStream;
- type Error = io::Error;
-
- fn poll(&mut self) -> Poll<TcpStream, io::Error> {
- self.poll_inner(|io| io.poll_write_ready())
- }
-}
-
#[cfg(unix)]
mod sys {
use super::TcpStream;