diff options
Diffstat (limited to 'tokio/src/net/udp/socket.rs')
-rw-r--r-- | tokio/src/net/udp/socket.rs | 116 |
1 files changed, 20 insertions, 96 deletions
diff --git a/tokio/src/net/udp/socket.rs b/tokio/src/net/udp/socket.rs index aeb25fb3..d0dece3e 100644 --- a/tokio/src/net/udp/socket.rs +++ b/tokio/src/net/udp/socket.rs @@ -1,13 +1,10 @@ -use crate::future::poll_fn; use crate::io::PollEvented; -use crate::net::udp::split::{split, RecvHalf, SendHalf}; use crate::net::ToSocketAddrs; use std::convert::TryFrom; use std::fmt; use std::io; use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr}; -use std::task::{Context, Poll}; cfg_udp! { /// A UDP socket @@ -67,15 +64,7 @@ impl UdpSocket { /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(socket: net::UdpSocket) -> io::Result<UdpSocket> { let io = mio::net::UdpSocket::from_socket(socket)?; - let io = PollEvented::new(io)?; - Ok(UdpSocket { io }) - } - - /// Splits the `UdpSocket` into a receive half and a send half. The two parts - /// can be used to receive and send datagrams concurrently, even from two - /// different tasks. - pub fn split(self) -> (RecvHalf, SendHalf) { - split(self) + UdpSocket::new(io) } /// Returns the local address that this socket is bound to. @@ -112,8 +101,10 @@ impl UdpSocket { /// will resolve to an error if the socket is not connected. /// /// [`connect`]: method@Self::connect - pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { - poll_fn(|cx| self.poll_send(cx, buf)).await + pub async fn send(&self, buf: &[u8]) -> io::Result<usize> { + self.io + .async_io(mio::Ready::writable(), |sock| sock.send(buf)) + .await } /// Try to send data on the socket to the remote address to which it is @@ -130,29 +121,6 @@ impl UdpSocket { self.io.get_ref().send(buf) } - // Poll IO functions that takes `&self` are provided for the split API. - // - // They are not public because (taken from the doc of `PollEvented`): - // - // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the - // caller must ensure that there are at most two tasks that use a - // `PollEvented` instance concurrently. One for reading and one for writing. - // While violating this requirement is "safe" from a Rust memory model point - // of view, it will result in unexpected behavior in the form of lost - // notifications and tasks hanging. - #[doc(hidden)] - pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> { - ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending - } - x => Poll::Ready(x), - } - } - /// Returns a future that receives a single datagram message on the socket from /// the remote address to which it is connected. On success, the future will resolve /// to the number of bytes read. @@ -165,21 +133,10 @@ impl UdpSocket { /// will fail if the socket is not connected. /// /// [`connect`]: method@Self::connect - pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { - poll_fn(|cx| self.poll_recv(cx, buf)).await - } - - #[doc(hidden)] - pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - match self.io.get_ref().recv(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - x => Poll::Ready(x), - } + pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> { + self.io + .async_io(mio::Ready::readable(), |sock| sock.recv(buf)) + .await } /// Returns a future that sends data on the socket to the given address. @@ -187,11 +144,11 @@ impl UdpSocket { /// /// The future will resolve to an error if the IP version of the socket does /// not match that of `target`. - pub async fn send_to<A: ToSocketAddrs>(&mut self, buf: &[u8], target: A) -> io::Result<usize> { + pub async fn send_to<A: ToSocketAddrs>(&self, buf: &[u8], target: A) -> io::Result<usize> { let mut addrs = target.to_socket_addrs().await?; match addrs.next() { - Some(target) => poll_fn(|cx| self.poll_send_to(cx, buf, &target)).await, + Some(target) => self.send_to_addr(buf, &target).await, None => Err(io::Error::new( io::ErrorKind::InvalidInput, "no addresses to send data to", @@ -214,23 +171,10 @@ impl UdpSocket { self.io.get_ref().send_to(buf, &target) } - // TODO: Public or not? - #[doc(hidden)] - pub fn poll_send_to( - &self, - cx: &mut Context<'_>, - buf: &[u8], - target: &SocketAddr, - ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send_to(buf, target) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending - } - x => Poll::Ready(x), - } + async fn send_to_addr(&self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> { + self.io + .async_io(mio::Ready::writable(), |sock| sock.send_to(buf, target)) + .await } /// Returns a future that receives a single datagram on the socket. On success, @@ -239,25 +183,10 @@ impl UdpSocket { /// The function must be called with valid byte array `buf` of sufficient size /// to hold the message bytes. If a message is too long to fit in the supplied /// buffer, excess bytes may be discarded. - pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - poll_fn(|cx| self.poll_recv_from(cx, buf)).await - } - - #[doc(hidden)] - pub fn poll_recv_from( - &self, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll<Result<(usize, SocketAddr), io::Error>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - match self.io.get_ref().recv_from(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - x => Poll::Ready(x), - } + pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.io + .async_io(mio::Ready::readable(), |sock| sock.recv_from(buf)) + .await } /// Gets the value of the `SO_BROADCAST` option for this socket. @@ -399,11 +328,6 @@ impl TryFrom<UdpSocket> for mio::net::UdpSocket { type Error = io::Error; /// Consumes value, returning the mio I/O object. - /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. - /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner fn try_from(value: UdpSocket) -> Result<Self, Self::Error> { value.io.into_inner() } @@ -423,7 +347,7 @@ impl TryFrom<net::UdpSocket> for UdpSocket { impl fmt::Debug for UdpSocket { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.io.get_ref().fmt(f) + self.io.fmt(f) } } |