summaryrefslogtreecommitdiffstats
path: root/tokio/src/net/tcp/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/net/tcp/stream.rs')
-rw-r--r--tokio/src/net/tcp/stream.rs73
1 files changed, 22 insertions, 51 deletions
diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs
index 045cb6c3..0a784b5f 100644
--- a/tokio/src/net/tcp/stream.rs
+++ b/tokio/src/net/tcp/stream.rs
@@ -1,12 +1,12 @@
use crate::future::poll_fn;
-use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf};
+use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf};
use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
use crate::net::{to_socket_addrs, ToSocketAddrs};
use std::convert::TryFrom;
use std::fmt;
-use std::io::{self, Read, Write};
+use std::io;
use std::net::{Shutdown, SocketAddr};
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -129,9 +129,9 @@ impl TcpStream {
// actually hit an error or not.
//
// If all that succeeded then we ship everything on up.
- poll_fn(|cx| stream.io.poll_write_ready(cx)).await?;
+ poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
- if let Some(e) = stream.io.get_ref().take_error()? {
+ if let Some(e) = stream.io.take_error()? {
return Err(e);
}
@@ -193,7 +193,7 @@ impl TcpStream {
/// # }
/// ```
pub fn local_addr(&self) -> io::Result<SocketAddr> {
- self.io.get_ref().local_addr()
+ self.io.local_addr()
}
/// Returns the remote address that this stream is connected to.
@@ -211,7 +211,7 @@ impl TcpStream {
/// # }
/// ```
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
- self.io.get_ref().peer_addr()
+ self.io.peer_addr()
}
/// Attempts to receive data on the socket, without removing that data from
@@ -252,12 +252,12 @@ impl TcpStream {
/// ```
pub fn poll_peek(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
loop {
- let ev = ready!(self.io.poll_read_ready(cx))?;
+ let ev = ready!(self.io.registration().poll_read_ready(cx))?;
- match self.io.get_ref().peek(buf) {
+ match self.io.peek(buf) {
Ok(ret) => return Poll::Ready(Ok(ret)),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_readiness(ev);
+ self.io.registration().clear_readiness(ev);
}
Err(e) => return Poll::Ready(Err(e)),
}
@@ -303,7 +303,8 @@ impl TcpStream {
/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
self.io
- .async_io(mio::Interest::READABLE, |io| io.peek(buf))
+ .registration()
+ .async_io(Interest::READABLE, || self.io.peek(buf))
.await
}
@@ -332,7 +333,7 @@ impl TcpStream {
/// }
/// ```
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
- self.io.get_ref().shutdown(how)
+ self.io.shutdown(how)
}
/// Gets the value of the `TCP_NODELAY` option on this socket.
@@ -354,7 +355,7 @@ impl TcpStream {
/// # }
/// ```
pub fn nodelay(&self) -> io::Result<bool> {
- self.io.get_ref().nodelay()
+ self.io.nodelay()
}
/// Sets the value of the `TCP_NODELAY` option on this socket.
@@ -378,7 +379,7 @@ impl TcpStream {
/// # }
/// ```
pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
- self.io.get_ref().set_nodelay(nodelay)
+ self.io.set_nodelay(nodelay)
}
/// Gets the value of the `IP_TTL` option for this socket.
@@ -400,7 +401,7 @@ impl TcpStream {
/// # }
/// ```
pub fn ttl(&self) -> io::Result<u32> {
- self.io.get_ref().ttl()
+ self.io.ttl()
}
/// Sets the value for the `IP_TTL` option on this socket.
@@ -421,7 +422,7 @@ impl TcpStream {
/// # }
/// ```
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
- self.io.get_ref().set_ttl(ttl)
+ self.io.set_ttl(ttl)
}
// These lifetime markers also appear in the generated documentation, and make
@@ -469,29 +470,8 @@ impl TcpStream {
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
- loop {
- let ev = ready!(self.io.poll_read_ready(cx))?;
-
- // Safety: `TcpStream::read` will not peek at the maybe uinitialized bytes.
- let b = unsafe {
- &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
- };
- match self.io.get_ref().read(b) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_readiness(ev);
- }
- Ok(n) => {
- // Safety: We trust `TcpStream::read` to have filled up `n` bytes
- // in the buffer.
- unsafe {
- buf.assume_init(n);
- }
- buf.advance(n);
- return Poll::Ready(Ok(()));
- }
- Err(e) => return Poll::Ready(Err(e)),
- }
- }
+ // Safety: `TcpStream::read` correctly handles reads into uninitialized memory
+ unsafe { self.io.poll_read(cx, buf) }
}
pub(super) fn poll_write_priv(
@@ -499,16 +479,7 @@ impl TcpStream {
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
- loop {
- let ev = ready!(self.io.poll_write_ready(cx))?;
-
- match self.io.get_ref().write(buf) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_readiness(ev);
- }
- x => return Poll::Ready(x),
- }
- }
+ self.io.poll_write(cx, buf)
}
}
@@ -559,7 +530,7 @@ impl AsyncWrite for TcpStream {
impl fmt::Debug for TcpStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- self.io.get_ref().fmt(f)
+ self.io.fmt(f)
}
}
@@ -570,7 +541,7 @@ mod sys {
impl AsRawFd for TcpStream {
fn as_raw_fd(&self) -> RawFd {
- self.io.get_ref().as_raw_fd()
+ self.io.as_raw_fd()
}
}
}
@@ -582,7 +553,7 @@ mod sys {
impl AsRawSocket for TcpStream {
fn as_raw_socket(&self) -> RawSocket {
- self.io.get_ref().as_raw_socket()
+ self.io.as_raw_socket()
}
}
}