summaryrefslogtreecommitdiffstats
path: root/tokio/src/net/unix/datagram
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/net/unix/datagram')
-rw-r--r--tokio/src/net/unix/datagram/mod.rs5
-rw-r--r--tokio/src/net/unix/datagram/socket.rs227
-rw-r--r--tokio/src/net/unix/datagram/split.rs68
-rw-r--r--tokio/src/net/unix/datagram/split_owned.rs148
4 files changed, 34 insertions, 414 deletions
diff --git a/tokio/src/net/unix/datagram/mod.rs b/tokio/src/net/unix/datagram/mod.rs
index f484ae34..6268b4ac 100644
--- a/tokio/src/net/unix/datagram/mod.rs
+++ b/tokio/src/net/unix/datagram/mod.rs
@@ -1,8 +1,3 @@
//! Unix datagram types.
pub(crate) mod socket;
-pub(crate) mod split;
-pub(crate) mod split_owned;
-
-pub use split::{RecvHalf, SendHalf};
-pub use split_owned::{OwnedRecvHalf, OwnedSendHalf, ReuniteError};
diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs
index ba3a10c4..78baf279 100644
--- a/tokio/src/net/unix/datagram/socket.rs
+++ b/tokio/src/net/unix/datagram/socket.rs
@@ -1,7 +1,4 @@
-use crate::future::poll_fn;
use crate::io::PollEvented;
-use crate::net::unix::datagram::split::{split, RecvHalf, SendHalf};
-use crate::net::unix::datagram::split_owned::{split_owned, OwnedRecvHalf, OwnedSendHalf};
use std::convert::TryFrom;
use std::fmt;
@@ -10,7 +7,6 @@ use std::net::Shutdown;
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net::{self, SocketAddr};
use std::path::Path;
-use std::task::{Context, Poll};
cfg_uds! {
/// An I/O object representing a Unix datagram socket.
@@ -38,9 +34,9 @@ cfg_uds! {
///
/// // Bind each socket to a filesystem path
/// let tx_path = tmp.path().join("tx");
- /// let mut tx = UnixDatagram::bind(&tx_path)?;
+ /// let tx = UnixDatagram::bind(&tx_path)?;
/// let rx_path = tmp.path().join("rx");
- /// let mut rx = UnixDatagram::bind(&rx_path)?;
+ /// let rx = UnixDatagram::bind(&rx_path)?;
///
/// let bytes = b"hello world";
/// tx.send_to(bytes, &rx_path).await?;
@@ -64,7 +60,7 @@ cfg_uds! {
/// use tokio::net::UnixDatagram;
///
/// // Create the pair of sockets
- /// let (mut sock1, mut sock2) = UnixDatagram::pair()?;
+ /// let (sock1, sock2) = UnixDatagram::pair()?;
///
/// // Since the sockets are paired, the paired send/recv
/// // functions can be used
@@ -128,7 +124,7 @@ impl UnixDatagram {
/// use tokio::net::UnixDatagram;
///
/// // Create the pair of sockets
- /// let (mut sock1, mut sock2) = UnixDatagram::pair()?;
+ /// let (sock1, sock2) = UnixDatagram::pair()?;
///
/// // Since the sockets are paired, the paired send/recv
/// // functions can be used
@@ -208,12 +204,12 @@ impl UnixDatagram {
/// use tempfile::tempdir;
///
/// // Create an unbound socket
- /// let mut tx = UnixDatagram::unbound()?;
+ /// let tx = UnixDatagram::unbound()?;
///
/// // Create another, bound socket
/// let tmp = tempdir()?;
/// let rx_path = tmp.path().join("rx");
- /// let mut rx = UnixDatagram::bind(&rx_path)?;
+ /// let rx = UnixDatagram::bind(&rx_path)?;
///
/// // Send to the bound socket
/// let bytes = b"hello world";
@@ -247,12 +243,12 @@ impl UnixDatagram {
/// use tempfile::tempdir;
///
/// // Create an unbound socket
- /// let mut tx = UnixDatagram::unbound()?;
+ /// let tx = UnixDatagram::unbound()?;
///
/// // Create another, bound socket
/// let tmp = tempdir()?;
/// let rx_path = tmp.path().join("rx");
- /// let mut rx = UnixDatagram::bind(&rx_path)?;
+ /// let rx = UnixDatagram::bind(&rx_path)?;
///
/// // Connect to the bound socket
/// tx.connect(&rx_path)?;
@@ -284,7 +280,7 @@ impl UnixDatagram {
/// use tokio::net::UnixDatagram;
///
/// // Create the pair of sockets
- /// let (mut sock1, mut sock2) = UnixDatagram::pair()?;
+ /// let (sock1, sock2) = UnixDatagram::pair()?;
///
/// // Since the sockets are paired, the paired send/recv
/// // functions can be used
@@ -300,8 +296,10 @@ impl UnixDatagram {
/// # Ok(())
/// # }
/// ```
- pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
- poll_fn(|cx| self.poll_send_priv(cx, buf)).await
+ pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
+ self.io
+ .async_io(mio::Ready::writable(), |sock| sock.send(buf))
+ .await
}
/// Try to send a datagram to the peer without waiting.
@@ -371,32 +369,6 @@ impl UnixDatagram {
self.io.get_ref().send_to(buf, target)
}
- // Poll IO functions that takes `&self` are provided for the split API.
- //
- // They are not public because (taken from the doc of `PollEvented`):
- //
- // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the
- // caller must ensure that there are at most two tasks that use a
- // `PollEvented` instance concurrently. One for reading and one for writing.
- // While violating this requirement is "safe" from a Rust memory model point
- // of view, it will result in unexpected behavior in the form of lost
- // notifications and tasks hanging.
- pub(crate) fn poll_send_priv(
- &self,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_write_ready(cx))?;
-
- match self.io.get_ref().send(buf) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_write_ready(cx)?;
- Poll::Pending
- }
- x => Poll::Ready(x),
- }
- }
-
/// Receives data from the socket.
///
/// # Examples
@@ -407,7 +379,7 @@ impl UnixDatagram {
/// use tokio::net::UnixDatagram;
///
/// // Create the pair of sockets
- /// let (mut sock1, mut sock2) = UnixDatagram::pair()?;
+ /// let (sock1, sock2) = UnixDatagram::pair()?;
///
/// // Since the sockets are paired, the paired send/recv
/// // functions can be used
@@ -423,8 +395,10 @@ impl UnixDatagram {
/// # Ok(())
/// # }
/// ```
- pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- poll_fn(|cx| self.poll_recv_priv(cx, buf)).await
+ pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.io
+ .async_io(mio::Ready::readable(), |sock| sock.recv(buf))
+ .await
}
/// Try to receive a datagram from the peer without waiting.
@@ -455,22 +429,6 @@ impl UnixDatagram {
self.io.get_ref().recv(buf)
}
- pub(crate) fn poll_recv_priv(
- &self,
- cx: &mut Context<'_>,
- buf: &mut [u8],
- ) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
-
- match self.io.get_ref().recv(buf) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(cx, mio::Ready::readable())?;
- Poll::Pending
- }
- x => Poll::Ready(x),
- }
- }
-
/// Sends data on the socket to the specified address.
///
/// # Examples
@@ -487,9 +445,9 @@ impl UnixDatagram {
///
/// // Bind each socket to a filesystem path
/// let tx_path = tmp.path().join("tx");
- /// let mut tx = UnixDatagram::bind(&tx_path)?;
+ /// let tx = UnixDatagram::bind(&tx_path)?;
/// let rx_path = tmp.path().join("rx");
- /// let mut rx = UnixDatagram::bind(&rx_path)?;
+ /// let rx = UnixDatagram::bind(&rx_path)?;
///
/// let bytes = b"hello world";
/// tx.send_to(bytes, &rx_path).await?;
@@ -504,28 +462,15 @@ impl UnixDatagram {
/// # Ok(())
/// # }
/// ```
- pub async fn send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize>
+ pub async fn send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
where
- P: AsRef<Path> + Unpin,
+ P: AsRef<Path>,
{
- poll_fn(|cx| self.poll_send_to_priv(cx, buf, target.as_ref())).await
- }
-
- pub(crate) fn poll_send_to_priv(
- &self,
- cx: &mut Context<'_>,
- buf: &[u8],
- target: &Path,
- ) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_write_ready(cx))?;
-
- match self.io.get_ref().send_to(buf, target) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_write_ready(cx)?;
- Poll::Pending
- }
- x => Poll::Ready(x),
- }
+ self.io
+ .async_io(mio::Ready::writable(), |sock| {
+ sock.send_to(buf, target.as_ref())
+ })
+ .await
}
/// Receives data from the socket.
@@ -544,9 +489,9 @@ impl UnixDatagram {
///
/// // Bind each socket to a filesystem path
/// let tx_path = tmp.path().join("tx");
- /// let mut tx = UnixDatagram::bind(&tx_path)?;
+ /// let tx = UnixDatagram::bind(&tx_path)?;
/// let rx_path = tmp.path().join("rx");
- /// let mut rx = UnixDatagram::bind(&rx_path)?;
+ /// let rx = UnixDatagram::bind(&rx_path)?;
///
/// let bytes = b"hello world";
/// tx.send_to(bytes, &rx_path).await?;
@@ -561,8 +506,10 @@ impl UnixDatagram {
/// # Ok(())
/// # }
/// ```
- pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
- poll_fn(|cx| self.poll_recv_from_priv(cx, buf)).await
+ pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+ self.io
+ .async_io(mio::Ready::readable(), |sock| sock.recv_from(buf))
+ .await
}
/// Try to receive data from the socket without waiting.
@@ -601,22 +548,6 @@ impl UnixDatagram {
self.io.get_ref().recv_from(buf)
}
- pub(crate) fn poll_recv_from_priv(
- &self,
- cx: &mut Context<'_>,
- buf: &mut [u8],
- ) -> Poll<Result<(usize, SocketAddr), io::Error>> {
- ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
-
- match self.io.get_ref().recv_from(buf) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(cx, mio::Ready::readable())?;
- Poll::Pending
- }
- x => Poll::Ready(x),
- }
- }
-
/// Returns the local address that this socket is bound to.
///
/// # Examples
@@ -748,7 +679,7 @@ impl UnixDatagram {
/// use std::net::Shutdown;
///
/// // Create an unbound socket
- /// let (mut socket, other) = UnixDatagram::pair()?;
+ /// let (socket, other) = UnixDatagram::pair()?;
///
/// socket.shutdown(Shutdown::Both)?;
///
@@ -768,102 +699,12 @@ impl UnixDatagram {
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
self.io.get_ref().shutdown(how)
}
-
- // These lifetime markers also appear in the generated documentation, and make
- // it more clear that this is a *borrowed* split.
- #[allow(clippy::needless_lifetimes)]
- /// Split a `UnixDatagram` into a receive half and a send half, which can be used
- /// to receive and send the datagram concurrently.
- ///
- /// This method is more efficient than [`into_split`], but the halves cannot
- /// be moved into independently spawned tasks.
- ///
- /// [`into_split`]: fn@crate::net::UnixDatagram::into_split
- ///
- /// # Examples
- /// ```
- /// # use std::error::Error;
- /// # #[tokio::main]
- /// # async fn main() -> Result<(), Box<dyn Error>> {
- /// use tokio::net::UnixDatagram;
- ///
- /// // Create the pair of sockets
- /// let (mut sock1, mut sock2) = UnixDatagram::pair()?;
- ///
- /// // Split sock1
- /// let (sock1_rx, mut sock1_tx) = sock1.split();
- ///
- /// // Since the sockets are paired, the paired send/recv
- /// // functions can be used
- /// let bytes = b"hello world";
- /// sock1_tx.send(bytes).await?;
- ///
- /// let mut buff = vec![0u8; 24];
- /// let size = sock2.recv(&mut buff).await?;
- ///
- /// let dgram = &buff[..size];
- /// assert_eq!(dgram, bytes);
- ///
- /// # Ok(())
- /// # }
- /// ```
- pub fn split<'a>(&'a mut self) -> (RecvHalf<'a>, SendHalf<'a>) {
- split(self)
- }
-
- /// Split a `UnixDatagram` into a receive half and a send half, which can be used
- /// to receive and send the datagram concurrently.
- ///
- /// Unlike [`split`], the owned halves can be moved to separate tasks,
- /// however this comes at the cost of a heap allocation.
- ///
- /// **Note:** Dropping the write half will shut down the write half of the
- /// datagram. This is equivalent to calling [`shutdown(Write)`].
- ///
- /// # Examples
- /// ```
- /// # use std::error::Error;
- /// # #[tokio::main]
- /// # async fn main() -> Result<(), Box<dyn Error>> {
- /// use tokio::net::UnixDatagram;
- ///
- /// // Create the pair of sockets
- /// let (sock1, mut sock2) = UnixDatagram::pair()?;
- ///
- /// // Split sock1
- /// let (sock1_rx, mut sock1_tx) = sock1.into_split();
- ///
- /// // Since the sockets are paired, the paired send/recv
- /// // functions can be used
- /// let bytes = b"hello world";
- /// sock1_tx.send(bytes).await?;
- ///
- /// let mut buff = vec![0u8; 24];
- /// let size = sock2.recv(&mut buff).await?;
- ///
- /// let dgram = &buff[..size];
- /// assert_eq!(dgram, bytes);
- ///
- /// # Ok(())
- /// # }
- /// ```
- ///
- /// [`split`]: fn@crate::net::UnixDatagram::split
- /// [`shutdown(Write)`]:fn@crate::net::UnixDatagram::shutdown
- pub fn into_split(self) -> (OwnedRecvHalf, OwnedSendHalf) {
- split_owned(self)
- }
}
impl TryFrom<UnixDatagram> for mio_uds::UnixDatagram {
type Error = io::Error;
/// Consumes value, returning the mio I/O object.
- ///
- /// See [`PollEvented::into_inner`] for more details about
- /// resource deregistration that happens during the call.
- ///
- /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
fn try_from(value: UnixDatagram) -> Result<Self, Self::Error> {
value.io.into_inner()
}
diff --git a/tokio/src/net/unix/datagram/split.rs b/tokio/src/net/unix/datagram/split.rs
deleted file mode 100644
index e42eeda8..00000000
--- a/tokio/src/net/unix/datagram/split.rs
+++ /dev/null
@@ -1,68 +0,0 @@
-//! `UnixDatagram` split support.
-//!
-//! A `UnixDatagram` can be split into a `RecvHalf` and a `SendHalf` with the
-//! `UnixDatagram::split` method.
-
-use crate::future::poll_fn;
-use crate::net::UnixDatagram;
-
-use std::io;
-use std::os::unix::net::SocketAddr;
-use std::path::Path;
-
-/// Borrowed receive half of a [`UnixDatagram`], created by [`split`].
-///
-/// [`UnixDatagram`]: UnixDatagram
-/// [`split`]: crate::net::UnixDatagram::split()
-#[derive(Debug)]
-pub struct RecvHalf<'a>(&'a UnixDatagram);
-
-/// Borrowed send half of a [`UnixDatagram`], created by [`split`].
-///
-/// [`UnixDatagram`]: UnixDatagram
-/// [`split`]: crate::net::UnixDatagram::split()
-#[derive(Debug)]
-pub struct SendHalf<'a>(&'a UnixDatagram);
-
-pub(crate) fn split(stream: &mut UnixDatagram) -> (RecvHalf<'_>, SendHalf<'_>) {
- (RecvHalf(&*stream), SendHalf(&*stream))
-}
-
-impl RecvHalf<'_> {
- /// Receives data from the socket.
- pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
- poll_fn(|cx| self.0.poll_recv_from_priv(cx, buf)).await
- }
-
- /// Receives data from the socket.
- pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- poll_fn(|cx| self.0.poll_recv_priv(cx, buf)).await
- }
-}
-
-impl SendHalf<'_> {
- /// Sends data on the socket to the specified address.
- pub async fn send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize>
- where
- P: AsRef<Path> + Unpin,
- {
- poll_fn(|cx| self.0.poll_send_to_priv(cx, buf, target.as_ref())).await
- }
-
- /// Sends data on the socket to the socket's peer.
- pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
- poll_fn(|cx| self.0.poll_send_priv(cx, buf)).await
- }
-}
-
-impl AsRef<UnixDatagram> for RecvHalf<'_> {
- fn as_ref(&self) -> &UnixDatagram {
- self.0
- }
-}
-
-impl AsRef<UnixDatagram> for SendHalf<'_> {
- fn as_ref(&self) -> &UnixDatagram {
- self.0
- }
-}
diff --git a/tokio/src/net/unix/datagram/split_owned.rs b/tokio/src/net/unix/datagram/split_owned.rs
deleted file mode 100644
index 699771f3..00000000
--- a/tokio/src/net/unix/datagram/split_owned.rs
+++ /dev/null
@@ -1,148 +0,0 @@
-//! `UnixDatagram` owned split support.
-//!
-//! A `UnixDatagram` can be split into an `OwnedSendHalf` and a `OwnedRecvHalf`
-//! with the `UnixDatagram::into_split` method.
-
-use crate::future::poll_fn;
-use crate::net::UnixDatagram;
-
-use std::error::Error;
-use std::net::Shutdown;
-use std::os::unix::net::SocketAddr;
-use std::path::Path;
-use std::sync::Arc;
-use std::{fmt, io};
-
-pub(crate) fn split_owned(socket: UnixDatagram) -> (OwnedRecvHalf, OwnedSendHalf) {
- let shared = Arc::new(socket);
- let send = shared.clone();
- let recv = shared;
- (
- OwnedRecvHalf { inner: recv },
- OwnedSendHalf {
- inner: send,
- shutdown_on_drop: true,
- },
- )
-}
-
-/// Owned send half of a [`UnixDatagram`], created by [`into_split`].
-///
-/// [`UnixDatagram`]: UnixDatagram
-/// [`into_split`]: UnixDatagram::into_split()
-#[derive(Debug)]
-pub struct OwnedSendHalf {
- inner: Arc<UnixDatagram>,
- shutdown_on_drop: bool,
-}
-
-/// Owned receive half of a [`UnixDatagram`], created by [`into_split`].
-///
-/// [`UnixDatagram`]: UnixDatagram
-/// [`into_split`]: UnixDatagram::into_split()
-#[derive(Debug)]
-pub struct OwnedRecvHalf {
- inner: Arc<UnixDatagram>,
-}
-
-/// Error indicating that two halves were not from the same socket, and thus could
-/// not be `reunite`d.
-#[derive(Debug)]
-pub struct ReuniteError(pub OwnedSendHalf, pub OwnedRecvHalf);
-
-impl fmt::Display for ReuniteError {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(
- f,
- "tried to reunite halves that are not from the same socket"
- )
- }
-}
-
-impl Error for ReuniteError {}
-
-fn reunite(s: OwnedSendHalf, r: OwnedRecvHalf) -> Result<UnixDatagram, ReuniteError> {
- if Arc::ptr_eq(&s.inner, &r.inner) {
- s.forget();
- // Only two instances of the `Arc` are ever created, one for the
- // receiver and one for the sender, and those `Arc`s are never exposed
- // externally. And so when we drop one here, the other one must be the
- // only remaining one.
- Ok(Arc::try_unwrap(r.inner).expect("UnixDatagram: try_unwrap failed in reunite"))
- } else {
- Err(ReuniteError(s, r))
- }
-}
-
-impl OwnedRecvHalf {
- /// Attempts to put the two "halves" of a `UnixDatagram` back together and
- /// recover the original socket. Succeeds only if the two "halves"
- /// originated from the same call to [`into_split`].
- ///
- /// [`into_split`]: UnixDatagram::into_split()
- pub fn reunite(self, other: OwnedSendHalf) -> Result<UnixDatagram, ReuniteError> {
- reunite(other, self)
- }
-
- /// Receives data from the socket.
- pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
- poll_fn(|cx| self.inner.poll_recv_from_priv(cx, buf)).await
- }
-
- /// Receives data from the socket.
- pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- poll_fn(|cx| self.inner.poll_recv_priv(cx, buf)).await
- }
-}
-
-impl OwnedSendHalf {
- /// Attempts to put the two "halves" of a `UnixDatagram` back together and
- /// recover the original socket. Succeeds only if the two "halves"
- /// originated from the same call to [`into_split`].
- ///
- /// [`into_split`]: UnixDatagram::into_split()
- pub fn reunite(self, other: OwnedRecvHalf) -> Result<UnixDatagram, ReuniteError> {
- reunite(self, other)
- }
-
- /// Sends data on the socket to the specified address.
- pub async fn send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize>
- where
- P: AsRef<Path> + Unpin,
- {
- poll_fn(|cx| self.inner.poll_send_to_priv(cx, buf, target.as_ref())).await
- }
-
- /// Sends data on the socket to the socket's peer.
- pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
- poll_fn(|cx| self.inner.poll_send_priv(cx, buf)).await
- }
-
- /// Destroy the send half, but don't close the send half of the stream
- /// until the receive half is dropped. If the read half has already been
- /// dropped, this closes the stream.
- pub fn forget(mut self) {
- self.shutdown_on_drop = false;
- drop(self);
- }
-}
-
-impl Drop for OwnedSendHalf {
- fn drop(&mut self) {
- if self.shutdown_on_drop {
- let _ = self.inner.shutdown(Shutdown::Write);
- }
- }
-}
-
-impl AsRef<UnixDatagram> for OwnedSendHalf {
- fn as_ref(&self) -> &UnixDatagram {
- &self.inner
- }
-}
-
-impl AsRef<UnixDatagram> for OwnedRecvHalf {
- fn as_ref(&self) -> &UnixDatagram {
- &self.inner
- }
-}