diff options
Diffstat (limited to 'tokio/src/net')
-rw-r--r-- | tokio/src/net/tcp/listener.rs | 23 | ||||
-rw-r--r-- | tokio/src/net/tcp/stream.rs | 73 | ||||
-rw-r--r-- | tokio/src/net/udp/socket.rs | 171 | ||||
-rw-r--r-- | tokio/src/net/unix/datagram/socket.rs | 38 | ||||
-rw-r--r-- | tokio/src/net/unix/listener.rs | 32 | ||||
-rw-r--r-- | tokio/src/net/unix/stream.rs | 52 |
6 files changed, 145 insertions, 244 deletions
diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs index be528f2b..8b0a4803 100644 --- a/tokio/src/net/tcp/listener.rs +++ b/tokio/src/net/tcp/listener.rs @@ -1,4 +1,4 @@ -use crate::io::PollEvented; +use crate::io::{Interest, PollEvented}; use crate::net::tcp::TcpStream; use crate::net::{to_socket_addrs, ToSocketAddrs}; @@ -164,7 +164,8 @@ impl TcpListener { pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { let (mio, addr) = self .io - .async_io(mio::Interest::READABLE, |sock| sock.accept()) + .registration() + .async_io(Interest::READABLE, || self.io.accept()) .await?; let stream = TcpStream::new(mio)?; @@ -181,15 +182,15 @@ impl TcpListener { /// single task. Failing to do this could result in tasks hanging. pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(TcpStream, SocketAddr)>> { loop { - let ev = ready!(self.io.poll_read_ready(cx))?; + let ev = ready!(self.io.registration().poll_read_ready(cx))?; - match self.io.get_ref().accept() { + match self.io.accept() { Ok((io, addr)) => { let io = TcpStream::new(io)?; return Poll::Ready(Ok((io, addr))); } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); + self.io.registration().clear_readiness(ev); } Err(e) => return Poll::Ready(Err(e)), } @@ -266,7 +267,7 @@ impl TcpListener { /// } /// ``` pub fn local_addr(&self) -> io::Result<SocketAddr> { - self.io.get_ref().local_addr() + self.io.local_addr() } /// Gets the value of the `IP_TTL` option for this socket. @@ -293,7 +294,7 @@ impl TcpListener { /// } /// ``` pub fn ttl(&self) -> io::Result<u32> { - self.io.get_ref().ttl() + self.io.ttl() } /// Sets the value for the `IP_TTL` option on this socket. @@ -318,7 +319,7 @@ impl TcpListener { /// } /// ``` pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { - self.io.get_ref().set_ttl(ttl) + self.io.set_ttl(ttl) } } @@ -346,7 +347,7 @@ impl TryFrom<net::TcpListener> for TcpListener { impl fmt::Debug for TcpListener { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.io.get_ref().fmt(f) + self.io.fmt(f) } } @@ -357,7 +358,7 @@ mod sys { impl AsRawFd for TcpListener { fn as_raw_fd(&self) -> RawFd { - self.io.get_ref().as_raw_fd() + self.io.as_raw_fd() } } } @@ -369,7 +370,7 @@ mod sys { impl AsRawSocket for TcpListener { fn as_raw_socket(&self) -> RawSocket { - self.io.get_ref().as_raw_socket() + self.io.as_raw_socket() } } } diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 045cb6c3..0a784b5f 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -1,12 +1,12 @@ use crate::future::poll_fn; -use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf}; +use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf}; use crate::net::tcp::split::{split, ReadHalf, WriteHalf}; use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf}; use crate::net::{to_socket_addrs, ToSocketAddrs}; use std::convert::TryFrom; use std::fmt; -use std::io::{self, Read, Write}; +use std::io; use std::net::{Shutdown, SocketAddr}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -129,9 +129,9 @@ impl TcpStream { // actually hit an error or not. // // If all that succeeded then we ship everything on up. - poll_fn(|cx| stream.io.poll_write_ready(cx)).await?; + poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?; - if let Some(e) = stream.io.get_ref().take_error()? { + if let Some(e) = stream.io.take_error()? { return Err(e); } @@ -193,7 +193,7 @@ impl TcpStream { /// # } /// ``` pub fn local_addr(&self) -> io::Result<SocketAddr> { - self.io.get_ref().local_addr() + self.io.local_addr() } /// Returns the remote address that this stream is connected to. @@ -211,7 +211,7 @@ impl TcpStream { /// # } /// ``` pub fn peer_addr(&self) -> io::Result<SocketAddr> { - self.io.get_ref().peer_addr() + self.io.peer_addr() } /// Attempts to receive data on the socket, without removing that data from @@ -252,12 +252,12 @@ impl TcpStream { /// ``` pub fn poll_peek(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> { loop { - let ev = ready!(self.io.poll_read_ready(cx))?; + let ev = ready!(self.io.registration().poll_read_ready(cx))?; - match self.io.get_ref().peek(buf) { + match self.io.peek(buf) { Ok(ret) => return Poll::Ready(Ok(ret)), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); + self.io.registration().clear_readiness(ev); } Err(e) => return Poll::Ready(Err(e)), } @@ -303,7 +303,8 @@ impl TcpStream { /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> { self.io - .async_io(mio::Interest::READABLE, |io| io.peek(buf)) + .registration() + .async_io(Interest::READABLE, || self.io.peek(buf)) .await } @@ -332,7 +333,7 @@ impl TcpStream { /// } /// ``` pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { - self.io.get_ref().shutdown(how) + self.io.shutdown(how) } /// Gets the value of the `TCP_NODELAY` option on this socket. @@ -354,7 +355,7 @@ impl TcpStream { /// # } /// ``` pub fn nodelay(&self) -> io::Result<bool> { - self.io.get_ref().nodelay() + self.io.nodelay() } /// Sets the value of the `TCP_NODELAY` option on this socket. @@ -378,7 +379,7 @@ impl TcpStream { /// # } /// ``` pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { - self.io.get_ref().set_nodelay(nodelay) + self.io.set_nodelay(nodelay) } /// Gets the value of the `IP_TTL` option for this socket. @@ -400,7 +401,7 @@ impl TcpStream { /// # } /// ``` pub fn ttl(&self) -> io::Result<u32> { - self.io.get_ref().ttl() + self.io.ttl() } /// Sets the value for the `IP_TTL` option on this socket. @@ -421,7 +422,7 @@ impl TcpStream { /// # } /// ``` pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { - self.io.get_ref().set_ttl(ttl) + self.io.set_ttl(ttl) } // These lifetime markers also appear in the generated documentation, and make @@ -469,29 +470,8 @@ impl TcpStream { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>> { - loop { - let ev = ready!(self.io.poll_read_ready(cx))?; - - // Safety: `TcpStream::read` will not peek at the maybe uinitialized bytes. - let b = unsafe { - &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) - }; - match self.io.get_ref().read(b) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); - } - Ok(n) => { - // Safety: We trust `TcpStream::read` to have filled up `n` bytes - // in the buffer. - unsafe { - buf.assume_init(n); - } - buf.advance(n); - return Poll::Ready(Ok(())); - } - Err(e) => return Poll::Ready(Err(e)), - } - } + // Safety: `TcpStream::read` correctly handles reads into uninitialized memory + unsafe { self.io.poll_read(cx, buf) } } pub(super) fn poll_write_priv( @@ -499,16 +479,7 @@ impl TcpStream { cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>> { - loop { - let ev = ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().write(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); - } - x => return Poll::Ready(x), - } - } + self.io.poll_write(cx, buf) } } @@ -559,7 +530,7 @@ impl AsyncWrite for TcpStream { impl fmt::Debug for TcpStream { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.io.get_ref().fmt(f) + self.io.fmt(f) } } @@ -570,7 +541,7 @@ mod sys { impl AsRawFd for TcpStream { fn as_raw_fd(&self) -> RawFd { - self.io.get_ref().as_raw_fd() + self.io.as_raw_fd() } } } @@ -582,7 +553,7 @@ mod sys { impl AsRawSocket for TcpStream { fn as_raw_socket(&self) -> RawSocket { - self.io.get_ref().as_raw_socket() + self.io.as_raw_socket() } } } diff --git a/tokio/src/net/udp/socket.rs b/tokio/src/net/udp/socket.rs index c68e37f8..f8b6a787 100644 --- a/tokio/src/net/udp/socket.rs +++ b/tokio/src/net/udp/socket.rs @@ -1,4 +1,4 @@ -use crate::io::{PollEvented, ReadBuf}; +use crate::io::{Interest, PollEvented, ReadBuf}; use crate::net::{to_socket_addrs, ToSocketAddrs}; use std::convert::TryFrom; @@ -216,7 +216,7 @@ impl UdpSocket { /// # } /// ``` pub fn local_addr(&self) -> io::Result<SocketAddr> { - self.io.get_ref().local_addr() + self.io.local_addr() } /// Connects the UDP socket setting the default destination for send() and @@ -248,7 +248,7 @@ impl UdpSocket { let mut last_err = None; for addr in addrs { - match self.io.get_ref().connect(addr) { + match self.io.connect(addr) { Ok(_) => return Ok(()), Err(e) => last_err = Some(e), } @@ -271,7 +271,8 @@ impl UdpSocket { /// [`connect`]: method@Self::connect pub async fn send(&self, buf: &[u8]) -> io::Result<usize> { self.io - .async_io(mio::Interest::WRITABLE, |sock| sock.send(buf)) + .registration() + .async_io(Interest::WRITABLE, || self.io.send(buf)) .await } @@ -299,16 +300,9 @@ impl UdpSocket { /// /// [`connect`]: method@Self::connect pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> { - loop { - let ev = ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); - } - x => return Poll::Ready(x), - } - } + self.io + .registration() + .poll_write_io(cx, || self.io.send(buf)) } /// Try to send data on the socket to the remote address to which it is @@ -322,7 +316,7 @@ impl UdpSocket { /// /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> { - self.io.get_ref().send(buf) + self.io.send(buf) } /// Returns a future that receives a single datagram message on the socket from @@ -339,7 +333,8 @@ impl UdpSocket { /// [`connect`]: method@Self::connect pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> { self.io - .async_io(mio::Interest::READABLE, |sock| sock.recv(buf)) + .registration() + .async_io(Interest::READABLE, || self.io.recv(buf)) .await } @@ -367,29 +362,21 @@ impl UdpSocket { /// /// [`connect`]: method@Self::connect pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> { - loop { - let ev = ready!(self.io.poll_read_ready(cx))?; - + let n = ready!(self.io.registration().poll_read_io(cx, || { // Safety: will not read the maybe uinitialized bytes. let b = unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; - match self.io.get_ref().recv(b) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); - } - Err(e) => return Poll::Ready(Err(e)), - Ok(n) => { - // Safety: We trust `recv` to have filled up `n` bytes - // in the buffer. - unsafe { - buf.assume_init(n); - } - buf.advance(n); - return Poll::Ready(Ok(())); - } - } + + self.io.recv(b) + }))?; + + // Safety: We trust `recv` to have filled up `n` bytes in the buffer. + unsafe { + buf.assume_init(n); } + buf.advance(n); + Poll::Ready(Ok(())) } /// Returns a future that sends data on the socket to the given address. @@ -448,16 +435,9 @@ impl UdpSocket { buf: &[u8], target: &SocketAddr, ) -> Poll<io::Result<usize>> { - loop { - let ev = ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send_to(buf, *target) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); - } - x => return Poll::Ready(x), - } - } + self.io + .registration() + .poll_write_io(cx, || self.io.send_to(buf, *target)) } /// Try to send data on the socket to the given address, but if the send is blocked @@ -489,12 +469,13 @@ impl UdpSocket { /// /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock pub fn try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> { - self.io.get_ref().send_to(buf, target) + self.io.send_to(buf, target) } async fn send_to_addr(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> { self.io - .async_io(mio::Interest::WRITABLE, |sock| sock.send_to(buf, target)) + .registration() + .async_io(Interest::WRITABLE, || self.io.send_to(buf, target)) .await } @@ -522,7 +503,8 @@ impl UdpSocket { /// ``` pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { self.io - .async_io(mio::Interest::READABLE, |sock| sock.recv_from(buf)) + .registration() + .async_io(Interest::READABLE, || self.io.recv_from(buf)) .await } @@ -548,29 +530,21 @@ impl UdpSocket { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<SocketAddr>> { - loop { - let ev = ready!(self.io.poll_read_ready(cx))?; - + let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || { // Safety: will not read the maybe uinitialized bytes. let b = unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; - match self.io.get_ref().recv_from(b) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); - } - Err(e) => return Poll::Ready(Err(e)), - Ok((n, addr)) => { - // Safety: We trust `recv` to have filled up `n` bytes - // in the buffer. - unsafe { - buf.assume_init(n); - } - buf.advance(n); - return Poll::Ready(Ok(addr)); - } - } + + self.io.recv_from(b) + }))?; + + // Safety: We trust `recv` to have filled up `n` bytes in the buffer. + unsafe { + buf.assume_init(n); } + buf.advance(n); + Poll::Ready(Ok(addr)) } /// Receives data from the socket, without removing it from the input queue. @@ -602,7 +576,8 @@ impl UdpSocket { /// ``` pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { self.io - .async_io(mio::Interest::READABLE, |sock| sock.peek_from(buf)) + .registration() + .async_io(Interest::READABLE, || self.io.peek_from(buf)) .await } @@ -637,29 +612,21 @@ impl UdpSocket { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<SocketAddr>> { - loop { - let ev = ready!(self.io.poll_read_ready(cx))?; - + let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || { // Safety: will not read the maybe uinitialized bytes. let b = unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; - match self.io.get_ref().peek_from(b) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); - } - Err(e) => return Poll::Ready(Err(e)), - Ok((n, addr)) => { - // Safety: We trust `recv` to have filled up `n` bytes - // in the buffer. - unsafe { - buf.assume_init(n); - } - buf.advance(n); - return Poll::Ready(Ok(addr)); - } - } + + self.io.peek_from(b) + }))?; + + // Safety: We trust `recv` to have filled up `n` bytes in the buffer. + unsafe { + buf.assume_init(n); } + buf.advance(n); + Poll::Ready(Ok(addr)) } /// Gets the value of the `SO_BROADCAST` option for this socket. @@ -668,7 +635,7 @@ impl UdpSocket { /// /// [`set_broadcast`]: method@Self::set_broadcast pub fn broadcast(&self) -> io::Result<bool> { - self.io.get_ref().broadcast() + self.io.broadcast() } /// Sets the value of the `SO_BROADCAST` option for this socket. @@ -676,7 +643,7 @@ impl UdpSocket { /// When enabled, this socket is allowed to send packets to a broadcast /// address. pub fn set_broadcast(&self, on: bool) -> io::Result<()> { - self.io.get_ref().set_broadcast(on) + self.io.set_broadcast(on) } /// Gets the value of the `IP_MULTICAST_LOOP` option for this socket. @@ -685,7 +652,7 @@ impl UdpSocket { /// /// [`set_multicast_loop_v4`]: method@Self::set_multicast_loop_v4 pub fn multicast_loop_v4(&self) -> io::Result<bool> { - self.io.get_ref().multicast_loop_v4() + self.io.multicast_loop_v4() } /// Sets the value of the `IP_MULTICAST_LOOP` option for this socket. @@ -696,7 +663,7 @@ impl UdpSocket { /// /// This may not have any affect on IPv6 sockets. pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> { - self.io.get_ref().set_multicast_loop_v4(on) + self.io.set_multicast_loop_v4(on) } /// Gets the value of the `IP_MULTICAST_TTL` option for this socket. @@ -705,7 +672,7 @@ impl UdpSocket { /// /// [`set_multicast_ttl_v4`]: method@Self::set_multicast_ttl_v4 pub fn multicast_ttl_v4(&self) -> io::Result<u32> { - self.io.get_ref().multicast_ttl_v4() + self.io.multicast_ttl_v4() } /// Sets the value of the `IP_MULTICAST_TTL` option for this socket. @@ -718,7 +685,7 @@ impl UdpSocket { /// /// This may not have any affect on IPv6 sockets. pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> { - self.io.get_ref().set_multicast_ttl_v4(ttl) + self.io.set_multicast_ttl_v4(ttl) } /// Gets the value of the `IPV6_MULTICAST_LOOP` option for this socket. @@ -727,7 +694,7 @@ impl UdpSocket { /// /// [`set_multicast_loop_v6`]: method@Self::set_multicast_loop_v6 pub fn multicast_loop_v6(&self) -> io::Result<bool> { - self.io.get_ref().multicast_loop_v6() + self.io.multicast_loop_v6() } /// Sets the value of the `IPV6_MULTICAST_LOOP` option for this socket. @@ -738,7 +705,7 @@ impl UdpSocket { /// /// This may not have any affect on IPv4 sockets. pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> { - self.io.get_ref().set_multicast_loop_v6(on) + self.io.set_multicast_loop_v6(on) } /// Gets the value of the `IP_TTL` option for this socket. @@ -761,7 +728,7 @@ impl UdpSocket { /// # } /// ``` pub fn ttl(&self) -> io::Result<u32> { - self.io.get_ref().ttl() + self.io.ttl() } /// Sets the value for the `IP_TTL` option on this socket. @@ -783,7 +750,7 @@ impl UdpSocket { /// # } /// ``` pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { - self.io.get_ref().set_ttl(ttl) + self.io.set_ttl(ttl) } /// Executes an operation of the `IP_ADD_MEMBERSHIP` type. @@ -794,7 +761,7 @@ impl UdpSocket { /// multicast group. If it's equal to `INADDR_ANY` then an appropriate /// interface is chosen by the system. pub fn join_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> { - self.io.get_ref().join_multicast_v4(&multiaddr, &interface) + self.io.join_multicast_v4(&multiaddr, &interface) } /// Executes an operation of the `IPV6_ADD_MEMBERSHIP` type. @@ -803,7 +770,7 @@ impl UdpSocket { /// The address must be a valid multicast address, and `interface` is the /// index of the interface to join/leave (or 0 to indicate any interface). pub fn join_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { - self.io.get_ref().join_multicast_v6(multiaddr, interface) + self.io.join_multicast_v6(multiaddr, interface) } /// Executes an operation of the `IP_DROP_MEMBERSHIP` type. @@ -812,7 +779,7 @@ impl UdpSocket { /// /// [`join_multicast_v4`]: method@Self::join_multicast_v4 pub fn leave_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> { - self.io.get_ref().leave_multicast_v4(&multiaddr, &interface) + self.io.leave_multicast_v4(&multiaddr, &interface) } /// Executes an operation of the `IPV6_DROP_MEMBERSHIP` type. @@ -821,7 +788,7 @@ impl UdpSocket { /// /// [`join_multicast_v6`]: method@Self::join_multicast_v6 pub fn leave_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { - self.io.get_ref().leave_multicast_v6(multiaddr, interface) + self.io.leave_multicast_v6(multiaddr, interface) } /// Returns the value of the `SO_ERROR` option. @@ -844,7 +811,7 @@ impl UdpSocket { /// # } /// ``` pub fn take_error(&self) -> io::Result<Option<io::Error>> { - self.io.get_ref().take_error() + self.io.take_error() } } @@ -862,7 +829,7 @@ impl TryFrom<std::net::UdpSocket> for UdpSocket { impl fmt::Debug for UdpSocket { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.io.get_ref().fmt(f) + self.io.fmt(f) } } @@ -873,7 +840,7 @@ mod sys { impl AsRawFd for UdpSocket { fn as_raw_fd(&self) -> RawFd { - self.io.get_ref().as_raw_fd() + self.io.as_raw_fd() } } } @@ -885,7 +852,7 @@ mod sys { impl AsRawSocket for UdpSocket { fn as_raw_socket(&self) -> RawSocket { - self.io.get_ref().as_raw_socket() + self.io.as_raw_socket() } } } diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs index 6215b579..f9e9321b 100644 --- a/tokio/src/net/unix/datagram/socket.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -1,4 +1,4 @@ -use crate::io::PollEvented; +use crate::io::{Interest, PollEvented}; use crate::net::unix::SocketAddr; use std::convert::TryFrom; @@ -270,7 +270,7 @@ impl UnixDatagram { /// # } /// ``` pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> { - self.io.get_ref().connect(path) + self.io.connect(path) } /// Sends data on the socket to the socket's peer. @@ -301,7 +301,8 @@ impl UnixDatagram { /// ``` pub async fn send(&self, buf: &[u8]) -> io::Result<usize> { self.io - .async_io(mio::Interest::WRITABLE, |sock| sock.send(buf)) + .registration() + .async_io(Interest::WRITABLE, || self.io.send(buf)) .await } @@ -330,7 +331,7 @@ impl UnixDatagram { /// # } /// ``` pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> { - self.io.get_ref().send(buf) + self.io.send(buf) } /// Try to send a datagram to the peer without waiting. @@ -369,7 +370,7 @@ impl UnixDatagram { where P: AsRef<Path>, { - self.io.get_ref().send_to(buf, target) + self.io.send_to(buf, target) } /// Receives data from the socket. @@ -400,7 +401,8 @@ impl UnixDatagram { /// ``` pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> { self.io - .async_io(mio::Interest::READABLE, |sock| sock.recv(buf)) + .registration() + .async_io(Interest::READABLE, || self.io.recv(buf)) .await } @@ -429,7 +431,7 @@ impl UnixDatagram { /// # } /// ``` pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> { - self.io.get_ref().recv(buf) + self.io.recv(buf) } /// Sends data on the socket to the specified address. @@ -470,9 +472,8 @@ impl UnixDatagram { P: AsRef<Path>, { self.io - .async_io(mio::Interest::WRITABLE, |sock| { - sock.send_to(buf, target.as_ref()) - }) + .registration() + .async_io(Interest::WRITABLE, || self.io.send_to(buf, target.as_ref())) .await } @@ -512,7 +513,8 @@ impl UnixDatagram { pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { let (n, addr) = self .io - .async_io(mio::Interest::READABLE, |sock| sock.recv_from(buf)) + .registration() + .async_io(Interest::READABLE, || self.io.recv_from(buf)) .await?; Ok((n, SocketAddr(addr))) @@ -551,7 +553,7 @@ impl UnixDatagram { /// # } /// ``` pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - let (n, addr) = self.io.get_ref().recv_from(buf)?; + let (n, addr) = self.io.recv_from(buf)?; Ok((n, SocketAddr(addr))) } @@ -596,7 +598,7 @@ impl UnixDatagram { /// # } /// ``` pub fn local_addr(&self) -> io::Result<SocketAddr> { - self.io.get_ref().local_addr().map(SocketAddr) + self.io.local_addr().map(SocketAddr) } /// Returns the address of this socket's peer. @@ -645,7 +647,7 @@ impl UnixDatagram { /// # } /// ``` pub fn peer_addr(&self) -> io::Result<SocketAddr> { - self.io.get_ref().peer_addr().map(SocketAddr) + self.io.peer_addr().map(SocketAddr) } /// Returns the value of the `SO_ERROR` option. @@ -668,7 +670,7 @@ impl UnixDatagram { /// # } /// ``` pub fn take_error(&self) -> io::Result<Option<io::Error>> { - self.io.get_ref().take_error() + self.io.take_error() } /// Shuts down the read, write, or both halves of this connection. @@ -704,7 +706,7 @@ impl UnixDatagram { /// # } /// ``` pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { - self.io.get_ref().shutdown(how) + self.io.shutdown(how) } } @@ -722,12 +724,12 @@ impl TryFrom<std::os::unix::net::UnixDatagram> for UnixDatagram { impl fmt::Debug for UnixDatagram { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.io.get_ref().fmt(f) + self.io.fmt(f) } } impl AsRawFd for UnixDatagram { fn as_raw_fd(&self) -> RawFd { - self.io.get_ref().as_raw_fd() + self.io.as_raw_fd() } } diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs index 8f0d4c0b..b1da0e3c 100644 --- a/tokio/src/net/unix/listener.rs +++ b/tokio/src/net/unix/listener.rs @@ -1,4 +1,4 @@ -use crate::io::PollEvented; +use crate::io::{Interest, PollEvented}; use crate::net::unix::{SocketAddr, UnixStream}; use std::convert::TryFrom; @@ -90,19 +90,20 @@ impl UnixListener { /// Returns the local socket address of this listener. pub fn local_addr(&self) -> io::Result<SocketAddr> { - self.io.get_ref().local_addr().map(SocketAddr) + self.io.local_addr().map(SocketAddr) } /// Returns the value of the `SO_ERROR` option. pub fn take_error(&self) -> io::Result<Option<io::Error>> { - self.io.get_ref().take_error() + self.io.take_error() } /// Accepts a new incoming connection to this listener. pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { let (mio, addr) = self .io - .async_io(mio::Interest::READABLE, |sock| sock.accept()) + .registration() + .async_io(Interest::READABLE, || self.io.accept()) .await?; let addr = SocketAddr(addr); @@ -119,21 +120,10 @@ impl UnixListener { /// The caller is responsible to ensure that `poll_accept` is called from a /// single task. Failing to do this could result in tasks hanging. pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(UnixStream, SocketAddr)>> { - loop { - let ev = ready!(self.io.poll_read_ready(cx))?; - - match self.io.get_ref().accept() { - Ok((sock, addr)) => { - let addr = SocketAddr(addr); - let sock = UnixStream::new(sock)?; - return Poll::Ready(Ok((sock, addr))); - } - Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); - } - Err(err) => return Err(err).into(), - } - } + let (sock, addr) = ready!(self.io.registration().poll_read_io(cx, || self.io.accept()))?; + let addr = SocketAddr(addr); + let sock = UnixStream::new(sock)?; + Poll::Ready(Ok((sock, addr))) } } @@ -161,12 +151,12 @@ impl TryFrom<std::os::unix::net::UnixListener> for UnixListener { impl fmt::Debug for UnixListener { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.io.get_ref().fmt(f) + self.io.fmt(f) } } impl AsRawFd for UnixListener { fn as_raw_fd(&self) -> RawFd { - self.io.get_ref().as_raw_fd() + self.io.as_raw_fd() } } diff --git a/tokio/src/net/unix/stream.rs b/tokio/src/net/unix/stream.rs index 2f3dd128..1d840926 100644 --- a/tokio/src/net/unix/stream.rs +++ b/tokio/src/net/unix/stream.rs @@ -7,7 +7,7 @@ use crate::net::unix::SocketAddr; use std::convert::TryFrom; use std::fmt; -use std::io::{self, Read, Write}; +use std::io; use std::net::Shutdown; use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::net; @@ -39,7 +39,7 @@ impl UnixStream { let stream = mio::net::UnixStream::connect(path)?; let stream = UnixStream::new(stream)?; - poll_fn(|cx| stream.io.poll_write_ready(cx)).await?; + poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?; Ok(stream) } @@ -84,12 +84,12 @@ impl UnixStream { /// Returns the socket address of the local half of this connection. pub fn local_addr(&self) -> io::Result<SocketAddr> { - self.io.get_ref().local_addr().map(SocketAddr) + self.io.local_addr().map(SocketAddr) } /// Returns the socket address of the remote half of this connection. pub fn peer_addr(&self) -> io::Result<SocketAddr> { - self.io.get_ref().peer_addr().map(SocketAddr) + self.io.peer_addr().map(SocketAddr) } /// Returns effective credentials of the process which called `connect` or `pair`. @@ -99,7 +99,7 @@ impl UnixStream { /// Returns the value of the `SO_ERROR` option. pub fn take_error(&self) -> io::Result<Option<io::Error>> { - self.io.get_ref().take_error() + self.io.take_error() } /// Shuts down the read, write, or both halves of this connection. @@ -108,7 +108,7 @@ impl UnixStream { /// specified portions to immediately return with an appropriate value /// (see the documentation of `Shutdown`). pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { - self.io.get_ref().shutdown(how) + self.io.shutdown(how) } // These lifetime markers also appear in the generated documentation, and make @@ -199,29 +199,8 @@ impl UnixStream { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>> { - loop { - let ev = ready!(self.io.poll_re |