diff options
Diffstat (limited to 'tokio/src/net/tcp/stream.rs')
-rw-r--r-- | tokio/src/net/tcp/stream.rs | 73 |
1 files changed, 22 insertions, 51 deletions
diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 045cb6c3..0a784b5f 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -1,12 +1,12 @@ use crate::future::poll_fn; -use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf}; +use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf}; use crate::net::tcp::split::{split, ReadHalf, WriteHalf}; use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf}; use crate::net::{to_socket_addrs, ToSocketAddrs}; use std::convert::TryFrom; use std::fmt; -use std::io::{self, Read, Write}; +use std::io; use std::net::{Shutdown, SocketAddr}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -129,9 +129,9 @@ impl TcpStream { // actually hit an error or not. // // If all that succeeded then we ship everything on up. - poll_fn(|cx| stream.io.poll_write_ready(cx)).await?; + poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?; - if let Some(e) = stream.io.get_ref().take_error()? { + if let Some(e) = stream.io.take_error()? { return Err(e); } @@ -193,7 +193,7 @@ impl TcpStream { /// # } /// ``` pub fn local_addr(&self) -> io::Result<SocketAddr> { - self.io.get_ref().local_addr() + self.io.local_addr() } /// Returns the remote address that this stream is connected to. @@ -211,7 +211,7 @@ impl TcpStream { /// # } /// ``` pub fn peer_addr(&self) -> io::Result<SocketAddr> { - self.io.get_ref().peer_addr() + self.io.peer_addr() } /// Attempts to receive data on the socket, without removing that data from @@ -252,12 +252,12 @@ impl TcpStream { /// ``` pub fn poll_peek(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> { loop { - let ev = ready!(self.io.poll_read_ready(cx))?; + let ev = ready!(self.io.registration().poll_read_ready(cx))?; - match self.io.get_ref().peek(buf) { + match self.io.peek(buf) { Ok(ret) => return Poll::Ready(Ok(ret)), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); + self.io.registration().clear_readiness(ev); } Err(e) => return Poll::Ready(Err(e)), } @@ -303,7 +303,8 @@ impl TcpStream { /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> { self.io - .async_io(mio::Interest::READABLE, |io| io.peek(buf)) + .registration() + .async_io(Interest::READABLE, || self.io.peek(buf)) .await } @@ -332,7 +333,7 @@ impl TcpStream { /// } /// ``` pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { - self.io.get_ref().shutdown(how) + self.io.shutdown(how) } /// Gets the value of the `TCP_NODELAY` option on this socket. @@ -354,7 +355,7 @@ impl TcpStream { /// # } /// ``` pub fn nodelay(&self) -> io::Result<bool> { - self.io.get_ref().nodelay() + self.io.nodelay() } /// Sets the value of the `TCP_NODELAY` option on this socket. @@ -378,7 +379,7 @@ impl TcpStream { /// # } /// ``` pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { - self.io.get_ref().set_nodelay(nodelay) + self.io.set_nodelay(nodelay) } /// Gets the value of the `IP_TTL` option for this socket. @@ -400,7 +401,7 @@ impl TcpStream { /// # } /// ``` pub fn ttl(&self) -> io::Result<u32> { - self.io.get_ref().ttl() + self.io.ttl() } /// Sets the value for the `IP_TTL` option on this socket. @@ -421,7 +422,7 @@ impl TcpStream { /// # } /// ``` pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { - self.io.get_ref().set_ttl(ttl) + self.io.set_ttl(ttl) } // These lifetime markers also appear in the generated documentation, and make @@ -469,29 +470,8 @@ impl TcpStream { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>> { - loop { - let ev = ready!(self.io.poll_read_ready(cx))?; - - // Safety: `TcpStream::read` will not peek at the maybe uinitialized bytes. - let b = unsafe { - &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) - }; - match self.io.get_ref().read(b) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); - } - Ok(n) => { - // Safety: We trust `TcpStream::read` to have filled up `n` bytes - // in the buffer. - unsafe { - buf.assume_init(n); - } - buf.advance(n); - return Poll::Ready(Ok(())); - } - Err(e) => return Poll::Ready(Err(e)), - } - } + // Safety: `TcpStream::read` correctly handles reads into uninitialized memory + unsafe { self.io.poll_read(cx, buf) } } pub(super) fn poll_write_priv( @@ -499,16 +479,7 @@ impl TcpStream { cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>> { - loop { - let ev = ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().write(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); - } - x => return Poll::Ready(x), - } - } + self.io.poll_write(cx, buf) } } @@ -559,7 +530,7 @@ impl AsyncWrite for TcpStream { impl fmt::Debug for TcpStream { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.io.get_ref().fmt(f) + self.io.fmt(f) } } @@ -570,7 +541,7 @@ mod sys { impl AsRawFd for TcpStream { fn as_raw_fd(&self) -> RawFd { - self.io.get_ref().as_raw_fd() + self.io.as_raw_fd() } } } @@ -582,7 +553,7 @@ mod sys { impl AsRawSocket for TcpStream { fn as_raw_socket(&self) -> RawSocket { - self.io.get_ref().as_raw_socket() + self.io.as_raw_socket() } } } |