diff options
Diffstat (limited to 'tokio/src/net/unix/datagram/socket.rs')
-rw-r--r-- | tokio/src/net/unix/datagram/socket.rs | 227 |
1 files changed, 34 insertions, 193 deletions
diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs index ba3a10c4..78baf279 100644 --- a/tokio/src/net/unix/datagram/socket.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -1,7 +1,4 @@ -use crate::future::poll_fn; use crate::io::PollEvented; -use crate::net::unix::datagram::split::{split, RecvHalf, SendHalf}; -use crate::net::unix::datagram::split_owned::{split_owned, OwnedRecvHalf, OwnedSendHalf}; use std::convert::TryFrom; use std::fmt; @@ -10,7 +7,6 @@ use std::net::Shutdown; use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::net::{self, SocketAddr}; use std::path::Path; -use std::task::{Context, Poll}; cfg_uds! { /// An I/O object representing a Unix datagram socket. @@ -38,9 +34,9 @@ cfg_uds! { /// /// // Bind each socket to a filesystem path /// let tx_path = tmp.path().join("tx"); - /// let mut tx = UnixDatagram::bind(&tx_path)?; + /// let tx = UnixDatagram::bind(&tx_path)?; /// let rx_path = tmp.path().join("rx"); - /// let mut rx = UnixDatagram::bind(&rx_path)?; + /// let rx = UnixDatagram::bind(&rx_path)?; /// /// let bytes = b"hello world"; /// tx.send_to(bytes, &rx_path).await?; @@ -64,7 +60,7 @@ cfg_uds! { /// use tokio::net::UnixDatagram; /// /// // Create the pair of sockets - /// let (mut sock1, mut sock2) = UnixDatagram::pair()?; + /// let (sock1, sock2) = UnixDatagram::pair()?; /// /// // Since the sockets are paired, the paired send/recv /// // functions can be used @@ -128,7 +124,7 @@ impl UnixDatagram { /// use tokio::net::UnixDatagram; /// /// // Create the pair of sockets - /// let (mut sock1, mut sock2) = UnixDatagram::pair()?; + /// let (sock1, sock2) = UnixDatagram::pair()?; /// /// // Since the sockets are paired, the paired send/recv /// // functions can be used @@ -208,12 +204,12 @@ impl UnixDatagram { /// use tempfile::tempdir; /// /// // Create an unbound socket - /// let mut tx = UnixDatagram::unbound()?; + /// let tx = UnixDatagram::unbound()?; /// /// // Create another, bound socket /// let tmp = tempdir()?; /// let rx_path = tmp.path().join("rx"); - /// let mut rx = UnixDatagram::bind(&rx_path)?; + /// let rx = UnixDatagram::bind(&rx_path)?; /// /// // Send to the bound socket /// let bytes = b"hello world"; @@ -247,12 +243,12 @@ impl UnixDatagram { /// use tempfile::tempdir; /// /// // Create an unbound socket - /// let mut tx = UnixDatagram::unbound()?; + /// let tx = UnixDatagram::unbound()?; /// /// // Create another, bound socket /// let tmp = tempdir()?; /// let rx_path = tmp.path().join("rx"); - /// let mut rx = UnixDatagram::bind(&rx_path)?; + /// let rx = UnixDatagram::bind(&rx_path)?; /// /// // Connect to the bound socket /// tx.connect(&rx_path)?; @@ -284,7 +280,7 @@ impl UnixDatagram { /// use tokio::net::UnixDatagram; /// /// // Create the pair of sockets - /// let (mut sock1, mut sock2) = UnixDatagram::pair()?; + /// let (sock1, sock2) = UnixDatagram::pair()?; /// /// // Since the sockets are paired, the paired send/recv /// // functions can be used @@ -300,8 +296,10 @@ impl UnixDatagram { /// # Ok(()) /// # } /// ``` - pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { - poll_fn(|cx| self.poll_send_priv(cx, buf)).await + pub async fn send(&self, buf: &[u8]) -> io::Result<usize> { + self.io + .async_io(mio::Ready::writable(), |sock| sock.send(buf)) + .await } /// Try to send a datagram to the peer without waiting. @@ -371,32 +369,6 @@ impl UnixDatagram { self.io.get_ref().send_to(buf, target) } - // Poll IO functions that takes `&self` are provided for the split API. - // - // They are not public because (taken from the doc of `PollEvented`): - // - // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the - // caller must ensure that there are at most two tasks that use a - // `PollEvented` instance concurrently. One for reading and one for writing. - // While violating this requirement is "safe" from a Rust memory model point - // of view, it will result in unexpected behavior in the form of lost - // notifications and tasks hanging. - pub(crate) fn poll_send_priv( - &self, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending - } - x => Poll::Ready(x), - } - } - /// Receives data from the socket. /// /// # Examples @@ -407,7 +379,7 @@ impl UnixDatagram { /// use tokio::net::UnixDatagram; /// /// // Create the pair of sockets - /// let (mut sock1, mut sock2) = UnixDatagram::pair()?; + /// let (sock1, sock2) = UnixDatagram::pair()?; /// /// // Since the sockets are paired, the paired send/recv /// // functions can be used @@ -423,8 +395,10 @@ impl UnixDatagram { /// # Ok(()) /// # } /// ``` - pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { - poll_fn(|cx| self.poll_recv_priv(cx, buf)).await + pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> { + self.io + .async_io(mio::Ready::readable(), |sock| sock.recv(buf)) + .await } /// Try to receive a datagram from the peer without waiting. @@ -455,22 +429,6 @@ impl UnixDatagram { self.io.get_ref().recv(buf) } - pub(crate) fn poll_recv_priv( - &self, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - match self.io.get_ref().recv(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - x => Poll::Ready(x), - } - } - /// Sends data on the socket to the specified address. /// /// # Examples @@ -487,9 +445,9 @@ impl UnixDatagram { /// /// // Bind each socket to a filesystem path /// let tx_path = tmp.path().join("tx"); - /// let mut tx = UnixDatagram::bind(&tx_path)?; + /// let tx = UnixDatagram::bind(&tx_path)?; /// let rx_path = tmp.path().join("rx"); - /// let mut rx = UnixDatagram::bind(&rx_path)?; + /// let rx = UnixDatagram::bind(&rx_path)?; /// /// let bytes = b"hello world"; /// tx.send_to(bytes, &rx_path).await?; @@ -504,28 +462,15 @@ impl UnixDatagram { /// # Ok(()) /// # } /// ``` - pub async fn send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize> + pub async fn send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize> where - P: AsRef<Path> + Unpin, + P: AsRef<Path>, { - poll_fn(|cx| self.poll_send_to_priv(cx, buf, target.as_ref())).await - } - - pub(crate) fn poll_send_to_priv( - &self, - cx: &mut Context<'_>, - buf: &[u8], - target: &Path, - ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send_to(buf, target) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending - } - x => Poll::Ready(x), - } + self.io + .async_io(mio::Ready::writable(), |sock| { + sock.send_to(buf, target.as_ref()) + }) + .await } /// Receives data from the socket. @@ -544,9 +489,9 @@ impl UnixDatagram { /// /// // Bind each socket to a filesystem path /// let tx_path = tmp.path().join("tx"); - /// let mut tx = UnixDatagram::bind(&tx_path)?; + /// let tx = UnixDatagram::bind(&tx_path)?; /// let rx_path = tmp.path().join("rx"); - /// let mut rx = UnixDatagram::bind(&rx_path)?; + /// let rx = UnixDatagram::bind(&rx_path)?; /// /// let bytes = b"hello world"; /// tx.send_to(bytes, &rx_path).await?; @@ -561,8 +506,10 @@ impl UnixDatagram { /// # Ok(()) /// # } /// ``` - pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - poll_fn(|cx| self.poll_recv_from_priv(cx, buf)).await + pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.io + .async_io(mio::Ready::readable(), |sock| sock.recv_from(buf)) + .await } /// Try to receive data from the socket without waiting. @@ -601,22 +548,6 @@ impl UnixDatagram { self.io.get_ref().recv_from(buf) } - pub(crate) fn poll_recv_from_priv( - &self, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll<Result<(usize, SocketAddr), io::Error>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - match self.io.get_ref().recv_from(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - x => Poll::Ready(x), - } - } - /// Returns the local address that this socket is bound to. /// /// # Examples @@ -748,7 +679,7 @@ impl UnixDatagram { /// use std::net::Shutdown; /// /// // Create an unbound socket - /// let (mut socket, other) = UnixDatagram::pair()?; + /// let (socket, other) = UnixDatagram::pair()?; /// /// socket.shutdown(Shutdown::Both)?; /// @@ -768,102 +699,12 @@ impl UnixDatagram { pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { self.io.get_ref().shutdown(how) } - - // These lifetime markers also appear in the generated documentation, and make - // it more clear that this is a *borrowed* split. - #[allow(clippy::needless_lifetimes)] - /// Split a `UnixDatagram` into a receive half and a send half, which can be used - /// to receive and send the datagram concurrently. - /// - /// This method is more efficient than [`into_split`], but the halves cannot - /// be moved into independently spawned tasks. - /// - /// [`into_split`]: fn@crate::net::UnixDatagram::into_split - /// - /// # Examples - /// ``` - /// # use std::error::Error; - /// # #[tokio::main] - /// # async fn main() -> Result<(), Box<dyn Error>> { - /// use tokio::net::UnixDatagram; - /// - /// // Create the pair of sockets - /// let (mut sock1, mut sock2) = UnixDatagram::pair()?; - /// - /// // Split sock1 - /// let (sock1_rx, mut sock1_tx) = sock1.split(); - /// - /// // Since the sockets are paired, the paired send/recv - /// // functions can be used - /// let bytes = b"hello world"; - /// sock1_tx.send(bytes).await?; - /// - /// let mut buff = vec![0u8; 24]; - /// let size = sock2.recv(&mut buff).await?; - /// - /// let dgram = &buff[..size]; - /// assert_eq!(dgram, bytes); - /// - /// # Ok(()) - /// # } - /// ``` - pub fn split<'a>(&'a mut self) -> (RecvHalf<'a>, SendHalf<'a>) { - split(self) - } - - /// Split a `UnixDatagram` into a receive half and a send half, which can be used - /// to receive and send the datagram concurrently. - /// - /// Unlike [`split`], the owned halves can be moved to separate tasks, - /// however this comes at the cost of a heap allocation. - /// - /// **Note:** Dropping the write half will shut down the write half of the - /// datagram. This is equivalent to calling [`shutdown(Write)`]. - /// - /// # Examples - /// ``` - /// # use std::error::Error; - /// # #[tokio::main] - /// # async fn main() -> Result<(), Box<dyn Error>> { - /// use tokio::net::UnixDatagram; - /// - /// // Create the pair of sockets - /// let (sock1, mut sock2) = UnixDatagram::pair()?; - /// - /// // Split sock1 - /// let (sock1_rx, mut sock1_tx) = sock1.into_split(); - /// - /// // Since the sockets are paired, the paired send/recv - /// // functions can be used - /// let bytes = b"hello world"; - /// sock1_tx.send(bytes).await?; - /// - /// let mut buff = vec![0u8; 24]; - /// let size = sock2.recv(&mut buff).await?; - /// - /// let dgram = &buff[..size]; - /// assert_eq!(dgram, bytes); - /// - /// # Ok(()) - /// # } - /// ``` - /// - /// [`split`]: fn@crate::net::UnixDatagram::split - /// [`shutdown(Write)`]:fn@crate::net::UnixDatagram::shutdown - pub fn into_split(self) -> (OwnedRecvHalf, OwnedSendHalf) { - split_owned(self) - } } impl TryFrom<UnixDatagram> for mio_uds::UnixDatagram { type Error = io::Error; /// Consumes value, returning the mio I/O object. - /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. - /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner fn try_from(value: UnixDatagram) -> Result<Self, Self::Error> { value.io.into_inner() } |