diff options
Diffstat (limited to 'tokio/src/net/unix/datagram')
-rw-r--r-- | tokio/src/net/unix/datagram/mod.rs | 5 | ||||
-rw-r--r-- | tokio/src/net/unix/datagram/socket.rs | 227 | ||||
-rw-r--r-- | tokio/src/net/unix/datagram/split.rs | 68 | ||||
-rw-r--r-- | tokio/src/net/unix/datagram/split_owned.rs | 148 |
4 files changed, 34 insertions, 414 deletions
diff --git a/tokio/src/net/unix/datagram/mod.rs b/tokio/src/net/unix/datagram/mod.rs index f484ae34..6268b4ac 100644 --- a/tokio/src/net/unix/datagram/mod.rs +++ b/tokio/src/net/unix/datagram/mod.rs @@ -1,8 +1,3 @@ //! Unix datagram types. pub(crate) mod socket; -pub(crate) mod split; -pub(crate) mod split_owned; - -pub use split::{RecvHalf, SendHalf}; -pub use split_owned::{OwnedRecvHalf, OwnedSendHalf, ReuniteError}; diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs index ba3a10c4..78baf279 100644 --- a/tokio/src/net/unix/datagram/socket.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -1,7 +1,4 @@ -use crate::future::poll_fn; use crate::io::PollEvented; -use crate::net::unix::datagram::split::{split, RecvHalf, SendHalf}; -use crate::net::unix::datagram::split_owned::{split_owned, OwnedRecvHalf, OwnedSendHalf}; use std::convert::TryFrom; use std::fmt; @@ -10,7 +7,6 @@ use std::net::Shutdown; use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::net::{self, SocketAddr}; use std::path::Path; -use std::task::{Context, Poll}; cfg_uds! { /// An I/O object representing a Unix datagram socket. @@ -38,9 +34,9 @@ cfg_uds! { /// /// // Bind each socket to a filesystem path /// let tx_path = tmp.path().join("tx"); - /// let mut tx = UnixDatagram::bind(&tx_path)?; + /// let tx = UnixDatagram::bind(&tx_path)?; /// let rx_path = tmp.path().join("rx"); - /// let mut rx = UnixDatagram::bind(&rx_path)?; + /// let rx = UnixDatagram::bind(&rx_path)?; /// /// let bytes = b"hello world"; /// tx.send_to(bytes, &rx_path).await?; @@ -64,7 +60,7 @@ cfg_uds! { /// use tokio::net::UnixDatagram; /// /// // Create the pair of sockets - /// let (mut sock1, mut sock2) = UnixDatagram::pair()?; + /// let (sock1, sock2) = UnixDatagram::pair()?; /// /// // Since the sockets are paired, the paired send/recv /// // functions can be used @@ -128,7 +124,7 @@ impl UnixDatagram { /// use tokio::net::UnixDatagram; /// /// // Create the pair of sockets - /// let (mut sock1, mut sock2) = UnixDatagram::pair()?; + /// let (sock1, sock2) = UnixDatagram::pair()?; /// /// // Since the sockets are paired, the paired send/recv /// // functions can be used @@ -208,12 +204,12 @@ impl UnixDatagram { /// use tempfile::tempdir; /// /// // Create an unbound socket - /// let mut tx = UnixDatagram::unbound()?; + /// let tx = UnixDatagram::unbound()?; /// /// // Create another, bound socket /// let tmp = tempdir()?; /// let rx_path = tmp.path().join("rx"); - /// let mut rx = UnixDatagram::bind(&rx_path)?; + /// let rx = UnixDatagram::bind(&rx_path)?; /// /// // Send to the bound socket /// let bytes = b"hello world"; @@ -247,12 +243,12 @@ impl UnixDatagram { /// use tempfile::tempdir; /// /// // Create an unbound socket - /// let mut tx = UnixDatagram::unbound()?; + /// let tx = UnixDatagram::unbound()?; /// /// // Create another, bound socket /// let tmp = tempdir()?; /// let rx_path = tmp.path().join("rx"); - /// let mut rx = UnixDatagram::bind(&rx_path)?; + /// let rx = UnixDatagram::bind(&rx_path)?; /// /// // Connect to the bound socket /// tx.connect(&rx_path)?; @@ -284,7 +280,7 @@ impl UnixDatagram { /// use tokio::net::UnixDatagram; /// /// // Create the pair of sockets - /// let (mut sock1, mut sock2) = UnixDatagram::pair()?; + /// let (sock1, sock2) = UnixDatagram::pair()?; /// /// // Since the sockets are paired, the paired send/recv /// // functions can be used @@ -300,8 +296,10 @@ impl UnixDatagram { /// # Ok(()) /// # } /// ``` - pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { - poll_fn(|cx| self.poll_send_priv(cx, buf)).await + pub async fn send(&self, buf: &[u8]) -> io::Result<usize> { + self.io + .async_io(mio::Ready::writable(), |sock| sock.send(buf)) + .await } /// Try to send a datagram to the peer without waiting. @@ -371,32 +369,6 @@ impl UnixDatagram { self.io.get_ref().send_to(buf, target) } - // Poll IO functions that takes `&self` are provided for the split API. - // - // They are not public because (taken from the doc of `PollEvented`): - // - // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the - // caller must ensure that there are at most two tasks that use a - // `PollEvented` instance concurrently. One for reading and one for writing. - // While violating this requirement is "safe" from a Rust memory model point - // of view, it will result in unexpected behavior in the form of lost - // notifications and tasks hanging. - pub(crate) fn poll_send_priv( - &self, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending - } - x => Poll::Ready(x), - } - } - /// Receives data from the socket. /// /// # Examples @@ -407,7 +379,7 @@ impl UnixDatagram { /// use tokio::net::UnixDatagram; /// /// // Create the pair of sockets - /// let (mut sock1, mut sock2) = UnixDatagram::pair()?; + /// let (sock1, sock2) = UnixDatagram::pair()?; /// /// // Since the sockets are paired, the paired send/recv /// // functions can be used @@ -423,8 +395,10 @@ impl UnixDatagram { /// # Ok(()) /// # } /// ``` - pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { - poll_fn(|cx| self.poll_recv_priv(cx, buf)).await + pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> { + self.io + .async_io(mio::Ready::readable(), |sock| sock.recv(buf)) + .await } /// Try to receive a datagram from the peer without waiting. @@ -455,22 +429,6 @@ impl UnixDatagram { self.io.get_ref().recv(buf) } - pub(crate) fn poll_recv_priv( - &self, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - match self.io.get_ref().recv(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - x => Poll::Ready(x), - } - } - /// Sends data on the socket to the specified address. /// /// # Examples @@ -487,9 +445,9 @@ impl UnixDatagram { /// /// // Bind each socket to a filesystem path /// let tx_path = tmp.path().join("tx"); - /// let mut tx = UnixDatagram::bind(&tx_path)?; + /// let tx = UnixDatagram::bind(&tx_path)?; /// let rx_path = tmp.path().join("rx"); - /// let mut rx = UnixDatagram::bind(&rx_path)?; + /// let rx = UnixDatagram::bind(&rx_path)?; /// /// let bytes = b"hello world"; /// tx.send_to(bytes, &rx_path).await?; @@ -504,28 +462,15 @@ impl UnixDatagram { /// # Ok(()) /// # } /// ``` - pub async fn send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize> + pub async fn send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize> where - P: AsRef<Path> + Unpin, + P: AsRef<Path>, { - poll_fn(|cx| self.poll_send_to_priv(cx, buf, target.as_ref())).await - } - - pub(crate) fn poll_send_to_priv( - &self, - cx: &mut Context<'_>, - buf: &[u8], - target: &Path, - ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send_to(buf, target) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending - } - x => Poll::Ready(x), - } + self.io + .async_io(mio::Ready::writable(), |sock| { + sock.send_to(buf, target.as_ref()) + }) + .await } /// Receives data from the socket. @@ -544,9 +489,9 @@ impl UnixDatagram { /// /// // Bind each socket to a filesystem path /// let tx_path = tmp.path().join("tx"); - /// let mut tx = UnixDatagram::bind(&tx_path)?; + /// let tx = UnixDatagram::bind(&tx_path)?; /// let rx_path = tmp.path().join("rx"); - /// let mut rx = UnixDatagram::bind(&rx_path)?; + /// let rx = UnixDatagram::bind(&rx_path)?; /// /// let bytes = b"hello world"; /// tx.send_to(bytes, &rx_path).await?; @@ -561,8 +506,10 @@ impl UnixDatagram { /// # Ok(()) /// # } /// ``` - pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - poll_fn(|cx| self.poll_recv_from_priv(cx, buf)).await + pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.io + .async_io(mio::Ready::readable(), |sock| sock.recv_from(buf)) + .await } /// Try to receive data from the socket without waiting. @@ -601,22 +548,6 @@ impl UnixDatagram { self.io.get_ref().recv_from(buf) } - pub(crate) fn poll_recv_from_priv( - &self, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll<Result<(usize, SocketAddr), io::Error>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - match self.io.get_ref().recv_from(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - x => Poll::Ready(x), - } - } - /// Returns the local address that this socket is bound to. /// /// # Examples @@ -748,7 +679,7 @@ impl UnixDatagram { /// use std::net::Shutdown; /// /// // Create an unbound socket - /// let (mut socket, other) = UnixDatagram::pair()?; + /// let (socket, other) = UnixDatagram::pair()?; /// /// socket.shutdown(Shutdown::Both)?; /// @@ -768,102 +699,12 @@ impl UnixDatagram { pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { self.io.get_ref().shutdown(how) } - - // These lifetime markers also appear in the generated documentation, and make - // it more clear that this is a *borrowed* split. - #[allow(clippy::needless_lifetimes)] - /// Split a `UnixDatagram` into a receive half and a send half, which can be used - /// to receive and send the datagram concurrently. - /// - /// This method is more efficient than [`into_split`], but the halves cannot - /// be moved into independently spawned tasks. - /// - /// [`into_split`]: fn@crate::net::UnixDatagram::into_split - /// - /// # Examples - /// ``` - /// # use std::error::Error; - /// # #[tokio::main] - /// # async fn main() -> Result<(), Box<dyn Error>> { - /// use tokio::net::UnixDatagram; - /// - /// // Create the pair of sockets - /// let (mut sock1, mut sock2) = UnixDatagram::pair()?; - /// - /// // Split sock1 - /// let (sock1_rx, mut sock1_tx) = sock1.split(); - /// - /// // Since the sockets are paired, the paired send/recv - /// // functions can be used - /// let bytes = b"hello world"; - /// sock1_tx.send(bytes).await?; - /// - /// let mut buff = vec![0u8; 24]; - /// let size = sock2.recv(&mut buff).await?; - /// - /// let dgram = &buff[..size]; - /// assert_eq!(dgram, bytes); - /// - /// # Ok(()) - /// # } - /// ``` - pub fn split<'a>(&'a mut self) -> (RecvHalf<'a>, SendHalf<'a>) { - split(self) - } - - /// Split a `UnixDatagram` into a receive half and a send half, which can be used - /// to receive and send the datagram concurrently. - /// - /// Unlike [`split`], the owned halves can be moved to separate tasks, - /// however this comes at the cost of a heap allocation. - /// - /// **Note:** Dropping the write half will shut down the write half of the - /// datagram. This is equivalent to calling [`shutdown(Write)`]. - /// - /// # Examples - /// ``` - /// # use std::error::Error; - /// # #[tokio::main] - /// # async fn main() -> Result<(), Box<dyn Error>> { - /// use tokio::net::UnixDatagram; - /// - /// // Create the pair of sockets - /// let (sock1, mut sock2) = UnixDatagram::pair()?; - /// - /// // Split sock1 - /// let (sock1_rx, mut sock1_tx) = sock1.into_split(); - /// - /// // Since the sockets are paired, the paired send/recv - /// // functions can be used - /// let bytes = b"hello world"; - /// sock1_tx.send(bytes).await?; - /// - /// let mut buff = vec![0u8; 24]; - /// let size = sock2.recv(&mut buff).await?; - /// - /// let dgram = &buff[..size]; - /// assert_eq!(dgram, bytes); - /// - /// # Ok(()) - /// # } - /// ``` - /// - /// [`split`]: fn@crate::net::UnixDatagram::split - /// [`shutdown(Write)`]:fn@crate::net::UnixDatagram::shutdown - pub fn into_split(self) -> (OwnedRecvHalf, OwnedSendHalf) { - split_owned(self) - } } impl TryFrom<UnixDatagram> for mio_uds::UnixDatagram { type Error = io::Error; /// Consumes value, returning the mio I/O object. - /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. - /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner fn try_from(value: UnixDatagram) -> Result<Self, Self::Error> { value.io.into_inner() } diff --git a/tokio/src/net/unix/datagram/split.rs b/tokio/src/net/unix/datagram/split.rs deleted file mode 100644 index e42eeda8..00000000 --- a/tokio/src/net/unix/datagram/split.rs +++ /dev/null @@ -1,68 +0,0 @@ -//! `UnixDatagram` split support. -//! -//! A `UnixDatagram` can be split into a `RecvHalf` and a `SendHalf` with the -//! `UnixDatagram::split` method. - -use crate::future::poll_fn; -use crate::net::UnixDatagram; - -use std::io; -use std::os::unix::net::SocketAddr; -use std::path::Path; - -/// Borrowed receive half of a [`UnixDatagram`], created by [`split`]. -/// -/// [`UnixDatagram`]: UnixDatagram -/// [`split`]: crate::net::UnixDatagram::split() -#[derive(Debug)] -pub struct RecvHalf<'a>(&'a UnixDatagram); - -/// Borrowed send half of a [`UnixDatagram`], created by [`split`]. -/// -/// [`UnixDatagram`]: UnixDatagram -/// [`split`]: crate::net::UnixDatagram::split() -#[derive(Debug)] -pub struct SendHalf<'a>(&'a UnixDatagram); - -pub(crate) fn split(stream: &mut UnixDatagram) -> (RecvHalf<'_>, SendHalf<'_>) { - (RecvHalf(&*stream), SendHalf(&*stream)) -} - -impl RecvHalf<'_> { - /// Receives data from the socket. - pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - poll_fn(|cx| self.0.poll_recv_from_priv(cx, buf)).await - } - - /// Receives data from the socket. - pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { - poll_fn(|cx| self.0.poll_recv_priv(cx, buf)).await - } -} - -impl SendHalf<'_> { - /// Sends data on the socket to the specified address. - pub async fn send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize> - where - P: AsRef<Path> + Unpin, - { - poll_fn(|cx| self.0.poll_send_to_priv(cx, buf, target.as_ref())).await - } - - /// Sends data on the socket to the socket's peer. - pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { - poll_fn(|cx| self.0.poll_send_priv(cx, buf)).await - } -} - -impl AsRef<UnixDatagram> for RecvHalf<'_> { - fn as_ref(&self) -> &UnixDatagram { - self.0 - } -} - -impl AsRef<UnixDatagram> for SendHalf<'_> { - fn as_ref(&self) -> &UnixDatagram { - self.0 - } -} diff --git a/tokio/src/net/unix/datagram/split_owned.rs b/tokio/src/net/unix/datagram/split_owned.rs deleted file mode 100644 index 699771f3..00000000 --- a/tokio/src/net/unix/datagram/split_owned.rs +++ /dev/null @@ -1,148 +0,0 @@ -//! `UnixDatagram` owned split support. -//! -//! A `UnixDatagram` can be split into an `OwnedSendHalf` and a `OwnedRecvHalf` -//! with the `UnixDatagram::into_split` method. - -use crate::future::poll_fn; -use crate::net::UnixDatagram; - -use std::error::Error; -use std::net::Shutdown; -use std::os::unix::net::SocketAddr; -use std::path::Path; -use std::sync::Arc; -use std::{fmt, io}; - -pub(crate) fn split_owned(socket: UnixDatagram) -> (OwnedRecvHalf, OwnedSendHalf) { - let shared = Arc::new(socket); - let send = shared.clone(); - let recv = shared; - ( - OwnedRecvHalf { inner: recv }, - OwnedSendHalf { - inner: send, - shutdown_on_drop: true, - }, - ) -} - -/// Owned send half of a [`UnixDatagram`], created by [`into_split`]. -/// -/// [`UnixDatagram`]: UnixDatagram -/// [`into_split`]: UnixDatagram::into_split() -#[derive(Debug)] -pub struct OwnedSendHalf { - inner: Arc<UnixDatagram>, - shutdown_on_drop: bool, -} - -/// Owned receive half of a [`UnixDatagram`], created by [`into_split`]. -/// -/// [`UnixDatagram`]: UnixDatagram -/// [`into_split`]: UnixDatagram::into_split() -#[derive(Debug)] -pub struct OwnedRecvHalf { - inner: Arc<UnixDatagram>, -} - -/// Error indicating that two halves were not from the same socket, and thus could -/// not be `reunite`d. -#[derive(Debug)] -pub struct ReuniteError(pub OwnedSendHalf, pub OwnedRecvHalf); - -impl fmt::Display for ReuniteError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "tried to reunite halves that are not from the same socket" - ) - } -} - -impl Error for ReuniteError {} - -fn reunite(s: OwnedSendHalf, r: OwnedRecvHalf) -> Result<UnixDatagram, ReuniteError> { - if Arc::ptr_eq(&s.inner, &r.inner) { - s.forget(); - // Only two instances of the `Arc` are ever created, one for the - // receiver and one for the sender, and those `Arc`s are never exposed - // externally. And so when we drop one here, the other one must be the - // only remaining one. - Ok(Arc::try_unwrap(r.inner).expect("UnixDatagram: try_unwrap failed in reunite")) - } else { - Err(ReuniteError(s, r)) - } -} - -impl OwnedRecvHalf { - /// Attempts to put the two "halves" of a `UnixDatagram` back together and - /// recover the original socket. Succeeds only if the two "halves" - /// originated from the same call to [`into_split`]. - /// - /// [`into_split`]: UnixDatagram::into_split() - pub fn reunite(self, other: OwnedSendHalf) -> Result<UnixDatagram, ReuniteError> { - reunite(other, self) - } - - /// Receives data from the socket. - pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - poll_fn(|cx| self.inner.poll_recv_from_priv(cx, buf)).await - } - - /// Receives data from the socket. - pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { - poll_fn(|cx| self.inner.poll_recv_priv(cx, buf)).await - } -} - -impl OwnedSendHalf { - /// Attempts to put the two "halves" of a `UnixDatagram` back together and - /// recover the original socket. Succeeds only if the two "halves" - /// originated from the same call to [`into_split`]. - /// - /// [`into_split`]: UnixDatagram::into_split() - pub fn reunite(self, other: OwnedRecvHalf) -> Result<UnixDatagram, ReuniteError> { - reunite(self, other) - } - - /// Sends data on the socket to the specified address. - pub async fn send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize> - where - P: AsRef<Path> + Unpin, - { - poll_fn(|cx| self.inner.poll_send_to_priv(cx, buf, target.as_ref())).await - } - - /// Sends data on the socket to the socket's peer. - pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { - poll_fn(|cx| self.inner.poll_send_priv(cx, buf)).await - } - - /// Destroy the send half, but don't close the send half of the stream - /// until the receive half is dropped. If the read half has already been - /// dropped, this closes the stream. - pub fn forget(mut self) { - self.shutdown_on_drop = false; - drop(self); - } -} - -impl Drop for OwnedSendHalf { - fn drop(&mut self) { - if self.shutdown_on_drop { - let _ = self.inner.shutdown(Shutdown::Write); - } - } -} - -impl AsRef<UnixDatagram> for OwnedSendHalf { - fn as_ref(&self) -> &UnixDatagram { - &self.inner - } -} - -impl AsRef<UnixDatagram> for OwnedRecvHalf { - fn as_ref(&self) -> &UnixDatagram { - &self.inner - } -} |