diff options
Diffstat (limited to 'tokio/src/net/udp/socket.rs')
-rw-r--r-- | tokio/src/net/udp/socket.rs | 171 |
1 files changed, 69 insertions, 102 deletions
diff --git a/tokio/src/net/udp/socket.rs b/tokio/src/net/udp/socket.rs index c68e37f8..f8b6a787 100644 --- a/tokio/src/net/udp/socket.rs +++ b/tokio/src/net/udp/socket.rs @@ -1,4 +1,4 @@ -use crate::io::{PollEvented, ReadBuf}; +use crate::io::{Interest, PollEvented, ReadBuf}; use crate::net::{to_socket_addrs, ToSocketAddrs}; use std::convert::TryFrom; @@ -216,7 +216,7 @@ impl UdpSocket { /// # } /// ``` pub fn local_addr(&self) -> io::Result<SocketAddr> { - self.io.get_ref().local_addr() + self.io.local_addr() } /// Connects the UDP socket setting the default destination for send() and @@ -248,7 +248,7 @@ impl UdpSocket { let mut last_err = None; for addr in addrs { - match self.io.get_ref().connect(addr) { + match self.io.connect(addr) { Ok(_) => return Ok(()), Err(e) => last_err = Some(e), } @@ -271,7 +271,8 @@ impl UdpSocket { /// [`connect`]: method@Self::connect pub async fn send(&self, buf: &[u8]) -> io::Result<usize> { self.io - .async_io(mio::Interest::WRITABLE, |sock| sock.send(buf)) + .registration() + .async_io(Interest::WRITABLE, || self.io.send(buf)) .await } @@ -299,16 +300,9 @@ impl UdpSocket { /// /// [`connect`]: method@Self::connect pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> { - loop { - let ev = ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); - } - x => return Poll::Ready(x), - } - } + self.io + .registration() + .poll_write_io(cx, || self.io.send(buf)) } /// Try to send data on the socket to the remote address to which it is @@ -322,7 +316,7 @@ impl UdpSocket { /// /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> { - self.io.get_ref().send(buf) + self.io.send(buf) } /// Returns a future that receives a single datagram message on the socket from @@ -339,7 +333,8 @@ impl UdpSocket { /// [`connect`]: method@Self::connect pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> { self.io - .async_io(mio::Interest::READABLE, |sock| sock.recv(buf)) + .registration() + .async_io(Interest::READABLE, || self.io.recv(buf)) .await } @@ -367,29 +362,21 @@ impl UdpSocket { /// /// [`connect`]: method@Self::connect pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> { - loop { - let ev = ready!(self.io.poll_read_ready(cx))?; - + let n = ready!(self.io.registration().poll_read_io(cx, || { // Safety: will not read the maybe uinitialized bytes. let b = unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; - match self.io.get_ref().recv(b) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); - } - Err(e) => return Poll::Ready(Err(e)), - Ok(n) => { - // Safety: We trust `recv` to have filled up `n` bytes - // in the buffer. - unsafe { - buf.assume_init(n); - } - buf.advance(n); - return Poll::Ready(Ok(())); - } - } + + self.io.recv(b) + }))?; + + // Safety: We trust `recv` to have filled up `n` bytes in the buffer. + unsafe { + buf.assume_init(n); } + buf.advance(n); + Poll::Ready(Ok(())) } /// Returns a future that sends data on the socket to the given address. @@ -448,16 +435,9 @@ impl UdpSocket { buf: &[u8], target: &SocketAddr, ) -> Poll<io::Result<usize>> { - loop { - let ev = ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send_to(buf, *target) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); - } - x => return Poll::Ready(x), - } - } + self.io + .registration() + .poll_write_io(cx, || self.io.send_to(buf, *target)) } /// Try to send data on the socket to the given address, but if the send is blocked @@ -489,12 +469,13 @@ impl UdpSocket { /// /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock pub fn try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> { - self.io.get_ref().send_to(buf, target) + self.io.send_to(buf, target) } async fn send_to_addr(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> { self.io - .async_io(mio::Interest::WRITABLE, |sock| sock.send_to(buf, target)) + .registration() + .async_io(Interest::WRITABLE, || self.io.send_to(buf, target)) .await } @@ -522,7 +503,8 @@ impl UdpSocket { /// ``` pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { self.io - .async_io(mio::Interest::READABLE, |sock| sock.recv_from(buf)) + .registration() + .async_io(Interest::READABLE, || self.io.recv_from(buf)) .await } @@ -548,29 +530,21 @@ impl UdpSocket { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<SocketAddr>> { - loop { - let ev = ready!(self.io.poll_read_ready(cx))?; - + let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || { // Safety: will not read the maybe uinitialized bytes. let b = unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; - match self.io.get_ref().recv_from(b) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); - } - Err(e) => return Poll::Ready(Err(e)), - Ok((n, addr)) => { - // Safety: We trust `recv` to have filled up `n` bytes - // in the buffer. - unsafe { - buf.assume_init(n); - } - buf.advance(n); - return Poll::Ready(Ok(addr)); - } - } + + self.io.recv_from(b) + }))?; + + // Safety: We trust `recv` to have filled up `n` bytes in the buffer. + unsafe { + buf.assume_init(n); } + buf.advance(n); + Poll::Ready(Ok(addr)) } /// Receives data from the socket, without removing it from the input queue. @@ -602,7 +576,8 @@ impl UdpSocket { /// ``` pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { self.io - .async_io(mio::Interest::READABLE, |sock| sock.peek_from(buf)) + .registration() + .async_io(Interest::READABLE, || self.io.peek_from(buf)) .await } @@ -637,29 +612,21 @@ impl UdpSocket { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<SocketAddr>> { - loop { - let ev = ready!(self.io.poll_read_ready(cx))?; - + let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || { // Safety: will not read the maybe uinitialized bytes. let b = unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; - match self.io.get_ref().peek_from(b) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); - } - Err(e) => return Poll::Ready(Err(e)), - Ok((n, addr)) => { - // Safety: We trust `recv` to have filled up `n` bytes - // in the buffer. - unsafe { - buf.assume_init(n); - } - buf.advance(n); - return Poll::Ready(Ok(addr)); - } - } + + self.io.peek_from(b) + }))?; + + // Safety: We trust `recv` to have filled up `n` bytes in the buffer. + unsafe { + buf.assume_init(n); } + buf.advance(n); + Poll::Ready(Ok(addr)) } /// Gets the value of the `SO_BROADCAST` option for this socket. @@ -668,7 +635,7 @@ impl UdpSocket { /// /// [`set_broadcast`]: method@Self::set_broadcast pub fn broadcast(&self) -> io::Result<bool> { - self.io.get_ref().broadcast() + self.io.broadcast() } /// Sets the value of the `SO_BROADCAST` option for this socket. @@ -676,7 +643,7 @@ impl UdpSocket { /// When enabled, this socket is allowed to send packets to a broadcast /// address. pub fn set_broadcast(&self, on: bool) -> io::Result<()> { - self.io.get_ref().set_broadcast(on) + self.io.set_broadcast(on) } /// Gets the value of the `IP_MULTICAST_LOOP` option for this socket. @@ -685,7 +652,7 @@ impl UdpSocket { /// /// [`set_multicast_loop_v4`]: method@Self::set_multicast_loop_v4 pub fn multicast_loop_v4(&self) -> io::Result<bool> { - self.io.get_ref().multicast_loop_v4() + self.io.multicast_loop_v4() } /// Sets the value of the `IP_MULTICAST_LOOP` option for this socket. @@ -696,7 +663,7 @@ impl UdpSocket { /// /// This may not have any affect on IPv6 sockets. pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> { - self.io.get_ref().set_multicast_loop_v4(on) + self.io.set_multicast_loop_v4(on) } /// Gets the value of the `IP_MULTICAST_TTL` option for this socket. @@ -705,7 +672,7 @@ impl UdpSocket { /// /// [`set_multicast_ttl_v4`]: method@Self::set_multicast_ttl_v4 pub fn multicast_ttl_v4(&self) -> io::Result<u32> { - self.io.get_ref().multicast_ttl_v4() + self.io.multicast_ttl_v4() } /// Sets the value of the `IP_MULTICAST_TTL` option for this socket. @@ -718,7 +685,7 @@ impl UdpSocket { /// /// This may not have any affect on IPv6 sockets. pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> { - self.io.get_ref().set_multicast_ttl_v4(ttl) + self.io.set_multicast_ttl_v4(ttl) } /// Gets the value of the `IPV6_MULTICAST_LOOP` option for this socket. @@ -727,7 +694,7 @@ impl UdpSocket { /// /// [`set_multicast_loop_v6`]: method@Self::set_multicast_loop_v6 pub fn multicast_loop_v6(&self) -> io::Result<bool> { - self.io.get_ref().multicast_loop_v6() + self.io.multicast_loop_v6() } /// Sets the value of the `IPV6_MULTICAST_LOOP` option for this socket. @@ -738,7 +705,7 @@ impl UdpSocket { /// /// This may not have any affect on IPv4 sockets. pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> { - self.io.get_ref().set_multicast_loop_v6(on) + self.io.set_multicast_loop_v6(on) } /// Gets the value of the `IP_TTL` option for this socket. @@ -761,7 +728,7 @@ impl UdpSocket { /// # } /// ``` pub fn ttl(&self) -> io::Result<u32> { - self.io.get_ref().ttl() + self.io.ttl() } /// Sets the value for the `IP_TTL` option on this socket. @@ -783,7 +750,7 @@ impl UdpSocket { /// # } /// ``` pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { - self.io.get_ref().set_ttl(ttl) + self.io.set_ttl(ttl) } /// Executes an operation of the `IP_ADD_MEMBERSHIP` type. @@ -794,7 +761,7 @@ impl UdpSocket { /// multicast group. If it's equal to `INADDR_ANY` then an appropriate /// interface is chosen by the system. pub fn join_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> { - self.io.get_ref().join_multicast_v4(&multiaddr, &interface) + self.io.join_multicast_v4(&multiaddr, &interface) } /// Executes an operation of the `IPV6_ADD_MEMBERSHIP` type. @@ -803,7 +770,7 @@ impl UdpSocket { /// The address must be a valid multicast address, and `interface` is the /// index of the interface to join/leave (or 0 to indicate any interface). pub fn join_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { - self.io.get_ref().join_multicast_v6(multiaddr, interface) + self.io.join_multicast_v6(multiaddr, interface) } /// Executes an operation of the `IP_DROP_MEMBERSHIP` type. @@ -812,7 +779,7 @@ impl UdpSocket { /// /// [`join_multicast_v4`]: method@Self::join_multicast_v4 pub fn leave_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> { - self.io.get_ref().leave_multicast_v4(&multiaddr, &interface) + self.io.leave_multicast_v4(&multiaddr, &interface) } /// Executes an operation of the `IPV6_DROP_MEMBERSHIP` type. @@ -821,7 +788,7 @@ impl UdpSocket { /// /// [`join_multicast_v6`]: method@Self::join_multicast_v6 pub fn leave_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { - self.io.get_ref().leave_multicast_v6(multiaddr, interface) + self.io.leave_multicast_v6(multiaddr, interface) } /// Returns the value of the `SO_ERROR` option. @@ -844,7 +811,7 @@ impl UdpSocket { /// # } /// ``` pub fn take_error(&self) -> io::Result<Option<io::Error>> { - self.io.get_ref().take_error() + self.io.take_error() } } @@ -862,7 +829,7 @@ impl TryFrom<std::net::UdpSocket> for UdpSocket { impl fmt::Debug for UdpSocket { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.io.get_ref().fmt(f) + self.io.fmt(f) } } @@ -873,7 +840,7 @@ mod sys { impl AsRawFd for UdpSocket { fn as_raw_fd(&self) -> RawFd { - self.io.get_ref().as_raw_fd() + self.io.as_raw_fd() } } } @@ -885,7 +852,7 @@ mod sys { impl AsRawSocket for UdpSocket { fn as_raw_socket(&self) -> RawSocket { - self.io.get_ref().as_raw_socket() + self.io.as_raw_socket() } } } |