summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlice Ryhl <alice@ryhl.io>2020-07-24 21:56:38 +0200
committerGitHub <noreply@github.com>2020-07-24 12:56:38 -0700
commit4fca1974e9d9f95fab7d723619294cb4b2dcebbb (patch)
treeb44bd16a594adff645d8342656c3cdfcff0c54f3
parent08872c55d161cac08f4feb3e141883a47ab766cf (diff)
net: ensure that unix sockets have both split and into_split (#2687)
The documentation build failed with errors such as error: `[read]` public documentation for `take` links to a private item --> tokio/src/io/util/async_read_ext.rs:1078:9 | 1078 | / /// Creates an adaptor which reads at most `limit` bytes from it. 1079 | | /// 1080 | | /// This function returns a new instance of `AsyncRead` which will read 1081 | | /// at most `limit` bytes, after which it will always return EOF ... | 1103 | | /// } 1104 | | /// ``` | |_______________^ | note: the lint level is defined here --> tokio/src/lib.rs:13:9 | 13 | #![deny(intra_doc_link_resolution_failure)] | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ = note: the link appears in this line: bytes read and future calls to [`read()`][read] may succeed.
-rw-r--r--tokio/src/io/util/async_read_ext.rs10
-rw-r--r--tokio/src/io/util/async_write_ext.rs2
-rw-r--r--tokio/src/net/mod.rs2
-rw-r--r--tokio/src/net/tcp/split.rs6
-rw-r--r--tokio/src/net/tcp/split_owned.rs21
-rw-r--r--tokio/src/net/tcp/stream.rs10
-rw-r--r--tokio/src/net/udp/split.rs2
-rw-r--r--tokio/src/net/unix/datagram/mod.rs8
-rw-r--r--tokio/src/net/unix/datagram/socket.rs (renamed from tokio/src/net/unix/datagram.rs)159
-rw-r--r--tokio/src/net/unix/datagram/split.rs68
-rw-r--r--tokio/src/net/unix/datagram/split_owned.rs148
-rw-r--r--tokio/src/net/unix/mod.rs6
-rw-r--r--tokio/src/net/unix/split.rs25
-rw-r--r--tokio/src/net/unix/split_owned.rs187
-rw-r--r--tokio/src/net/unix/stream.rs26
-rw-r--r--tokio/src/sync/broadcast.rs1
16 files changed, 524 insertions, 157 deletions
diff --git a/tokio/src/io/util/async_read_ext.rs b/tokio/src/io/util/async_read_ext.rs
index e848a5d2..1ed7e6cf 100644
--- a/tokio/src/io/util/async_read_ext.rs
+++ b/tokio/src/io/util/async_read_ext.rs
@@ -986,10 +986,12 @@ cfg_io_util! {
///
/// All bytes read from this source will be appended to the specified
/// buffer `buf`. This function will continuously call [`read()`] to
- /// append more data to `buf` until [`read()`][read] returns `Ok(0)`.
+ /// append more data to `buf` until [`read()`] returns `Ok(0)`.
///
/// If successful, the total number of bytes read is returned.
///
+ /// [`read()`]: fn@crate::io::AsyncReadExt::read
+ ///
/// # Errors
///
/// If a read error is encountered then the `read_to_end` operation
@@ -1018,7 +1020,7 @@ cfg_io_util! {
/// (See also the [`tokio::fs::read`] convenience function for reading from a
/// file.)
///
- /// [`tokio::fs::read`]: crate::fs::read::read
+ /// [`tokio::fs::read`]: fn@crate::fs::read
fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, Self>
where
Self: Unpin,
@@ -1078,7 +1080,9 @@ cfg_io_util! {
/// This function returns a new instance of `AsyncRead` which will read
/// at most `limit` bytes, after which it will always return EOF
/// (`Ok(0)`). Any read errors will not count towards the number of
- /// bytes read and future calls to [`read()`][read] may succeed.
+ /// bytes read and future calls to [`read()`] may succeed.
+ ///
+ /// [`read()`]: fn@crate::io::AsyncReadExt::read
///
/// # Examples
///
diff --git a/tokio/src/io/util/async_write_ext.rs b/tokio/src/io/util/async_write_ext.rs
index fa410974..321301e2 100644
--- a/tokio/src/io/util/async_write_ext.rs
+++ b/tokio/src/io/util/async_write_ext.rs
@@ -976,6 +976,8 @@ cfg_io_util! {
/// no longer attempt to write to the stream. For example, the
/// `TcpStream` implementation will issue a `shutdown(Write)` sys call.
///
+ /// [`flush`]: fn@crate::io::AsyncWriteExt::flush
+ ///
/// # Examples
///
/// ```no_run
diff --git a/tokio/src/net/mod.rs b/tokio/src/net/mod.rs
index eb24ac0b..da6ad1fc 100644
--- a/tokio/src/net/mod.rs
+++ b/tokio/src/net/mod.rs
@@ -43,7 +43,7 @@ cfg_udp! {
cfg_uds! {
pub mod unix;
- pub use unix::datagram::UnixDatagram;
+ pub use unix::datagram::socket::UnixDatagram;
pub use unix::listener::UnixListener;
pub use unix::stream::UnixStream;
}
diff --git a/tokio/src/net/tcp/split.rs b/tokio/src/net/tcp/split.rs
index 469056ac..0c1e359f 100644
--- a/tokio/src/net/tcp/split.rs
+++ b/tokio/src/net/tcp/split.rs
@@ -19,7 +19,7 @@ use std::net::Shutdown;
use std::pin::Pin;
use std::task::{Context, Poll};
-/// Read half of a [`TcpStream`], created by [`split`].
+/// Borrowed read half of a [`TcpStream`], created by [`split`].
///
/// Reading from a `ReadHalf` is usually done using the convenience methods found on the
/// [`AsyncReadExt`] trait. Examples import this trait through [the prelude].
@@ -31,12 +31,12 @@ use std::task::{Context, Poll};
#[derive(Debug)]
pub struct ReadHalf<'a>(&'a TcpStream);
-/// Write half of a [`TcpStream`], created by [`split`].
+/// Borrowed write half of a [`TcpStream`], created by [`split`].
///
/// Note that in the [`AsyncWrite`] implemenation of this type, [`poll_shutdown`] will
/// shut down the TCP stream in the write direction.
///
-/// Writing to an `OwnedWriteHalf` is usually done using the convenience methods found
+/// Writing to an `WriteHalf` is usually done using the convenience methods found
/// on the [`AsyncWriteExt`] trait. Examples import this trait through [the prelude].
///
/// [`TcpStream`]: TcpStream
diff --git a/tokio/src/net/tcp/split_owned.rs b/tokio/src/net/tcp/split_owned.rs
index 3f6ee33f..6c2b9e69 100644
--- a/tokio/src/net/tcp/split_owned.rs
+++ b/tokio/src/net/tcp/split_owned.rs
@@ -37,10 +37,9 @@ pub struct OwnedReadHalf {
/// Owned write half of a [`TcpStream`], created by [`into_split`].
///
-/// Note that in the [`AsyncWrite`] implemenation of this type, [`poll_shutdown`] will
-/// shut down the TCP stream in the write direction.
-///
-/// Dropping the write half will shutdown the write half of the TCP stream.
+/// Note that in the [`AsyncWrite`] implementation of this type, [`poll_shutdown`] will
+/// shut down the TCP stream in the write direction. Dropping the write half
+/// will also shut down the write half of the TCP stream.
///
/// Writing to an `OwnedWriteHalf` is usually done using the convenience methods found
/// on the [`AsyncWriteExt`] trait. Examples import this trait through [the prelude].
@@ -77,13 +76,13 @@ pub(crate) fn reunite(
write.forget();
// This unwrap cannot fail as the api does not allow creating more than two Arcs,
// and we just dropped the other half.
- Ok(Arc::try_unwrap(read.inner).expect("Too many handles to Arc"))
+ Ok(Arc::try_unwrap(read.inner).expect("TcpStream: try_unwrap failed in reunite"))
} else {
Err(ReuniteError(read, write))
}
}
-/// Error indicating two halves were not from the same socket, and thus could
+/// Error indicating that two halves were not from the same socket, and thus could
/// not be reunited.
#[derive(Debug)]
pub struct ReuniteError(pub OwnedReadHalf, pub OwnedWriteHalf);
@@ -210,7 +209,9 @@ impl OwnedWriteHalf {
reunite(other, self)
}
- /// Drop the write half, but don't issue a TCP shutdown.
+ /// Destroy the write half, but don't close the write half of the stream
+ /// until the read half is dropped. If the read half has already been
+ /// dropped, this closes the stream.
pub fn forget(mut self) {
self.shutdown_on_drop = false;
drop(self);
@@ -250,7 +251,11 @@ impl AsyncWrite for OwnedWriteHalf {
// `poll_shutdown` on a write half shutdowns the stream in the "write" direction.
fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
- self.inner.shutdown(Shutdown::Write).into()
+ let res = self.inner.shutdown(Shutdown::Write);
+ if res.is_ok() {
+ Pin::into_inner(self).shutdown_on_drop = false;
+ }
+ res.into()
}
}
diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs
index cc81e116..02b52627 100644
--- a/tokio/src/net/tcp/stream.rs
+++ b/tokio/src/net/tcp/stream.rs
@@ -659,6 +659,9 @@ impl TcpStream {
self.io.get_ref().set_linger(dur)
}
+ // These lifetime markers also appear in the generated documentation, and make
+ // it more clear that this is a *borrowed* split.
+ #[allow(clippy::needless_lifetimes)]
/// Splits a `TcpStream` into a read half and a write half, which can be used
/// to read and write the stream concurrently.
///
@@ -666,7 +669,7 @@ impl TcpStream {
/// moved into independently spawned tasks.
///
/// [`into_split`]: TcpStream::into_split()
- pub fn split(&mut self) -> (ReadHalf<'_>, WriteHalf<'_>) {
+ pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
split(self)
}
@@ -676,10 +679,11 @@ impl TcpStream {
/// Unlike [`split`], the owned halves can be moved to separate tasks, however
/// this comes at the cost of a heap allocation.
///
- /// **Note::** Dropping the write half will shutdown the write half of the TCP
- /// stream. This is equivalent to calling `shutdown(Write)` on the `TcpStream`.
+ /// **Note:** Dropping the write half will shut down the write half of the TCP
+ /// stream. This is equivalent to calling [`shutdown(Write)`] on the `TcpStream`.
///
/// [`split`]: TcpStream::split()
+ /// [`shutdown(Write)`]: fn@crate::net::TcpStream::shutdown
pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
split_owned(self)
}
diff --git a/tokio/src/net/udp/split.rs b/tokio/src/net/udp/split.rs
index e8d434aa..8d87f1c7 100644
--- a/tokio/src/net/udp/split.rs
+++ b/tokio/src/net/udp/split.rs
@@ -42,7 +42,7 @@ pub(crate) fn split(socket: UdpSocket) -> (RecvHalf, SendHalf) {
(RecvHalf(recv), SendHalf(send))
}
-/// Error indicating two halves were not from the same socket, and thus could
+/// Error indicating that two halves were not from the same socket, and thus could
/// not be `reunite`d.
#[derive(Debug)]
pub struct ReuniteError(pub SendHalf, pub RecvHalf);
diff --git a/tokio/src/net/unix/datagram/mod.rs b/tokio/src/net/unix/datagram/mod.rs
new file mode 100644
index 00000000..f484ae34
--- /dev/null
+++ b/tokio/src/net/unix/datagram/mod.rs
@@ -0,0 +1,8 @@
+//! Unix datagram types.
+
+pub(crate) mod socket;
+pub(crate) mod split;
+pub(crate) mod split_owned;
+
+pub use split::{RecvHalf, SendHalf};
+pub use split_owned::{OwnedRecvHalf, OwnedSendHalf, ReuniteError};
diff --git a/tokio/src/net/unix/datagram.rs b/tokio/src/net/unix/datagram/socket.rs
index de450e24..2fe5654e 100644
--- a/tokio/src/net/unix/datagram.rs
+++ b/tokio/src/net/unix/datagram/socket.rs
@@ -1,15 +1,15 @@
use crate::future::poll_fn;
use crate::io::PollEvented;
+use crate::net::unix::datagram::split::{split, RecvHalf, SendHalf};
+use crate::net::unix::datagram::split_owned::{split_owned, OwnedRecvHalf, OwnedSendHalf};
use std::convert::TryFrom;
-use std::error::Error;
use std::fmt;
use std::io;
use std::net::Shutdown;
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net::{self, SocketAddr};
use std::path::Path;
-use std::sync::Arc;
use std::task::{Context, Poll};
cfg_uds! {
@@ -204,8 +204,31 @@ impl UnixDatagram {
self.io.get_ref().shutdown(how)
}
+ // These lifetime markers also appear in the generated documentation, and make
+ // it more clear that this is a *borrowed* split.
+ #[allow(clippy::needless_lifetimes)]
/// Split a `UnixDatagram` into a receive half and a send half, which can be used
- /// to receice and send the datagram concurrently.
+ /// to receive and send the datagram concurrently.
+ ///
+ /// This method is more efficient than [`into_split`], but the halves cannot
+ /// be moved into independently spawned tasks.
+ ///
+ /// [`into_split`]: fn@crate::net::UnixDatagram::into_split
+ pub fn split<'a>(&'a mut self) -> (RecvHalf<'a>, SendHalf<'a>) {
+ split(self)
+ }
+
+ /// Split a `UnixDatagram` into a receive half and a send half, which can be used
+ /// to receive and send the datagram concurrently.
+ ///
+ /// Unlike [`split`], the owned halves can be moved to separate tasks,
+ /// however this comes at the cost of a heap allocation.
+ ///
+ /// **Note:** Dropping the write half will shut down the write half of the
+ /// datagram. This is equivalent to calling [`shutdown(Write)`].
+ ///
+ /// [`split`]: fn@crate::net::UnixDatagram::split
+ /// [`shutdown(Write)`]:fn@crate::net::UnixDatagram::shutdown
pub fn into_split(self) -> (OwnedRecvHalf, OwnedSendHalf) {
split_owned(self)
}
@@ -248,133 +271,3 @@ impl AsRawFd for UnixDatagram {
self.io.get_ref().as_raw_fd()
}
}
-
-fn split_owned(socket: UnixDatagram) -> (OwnedRecvHalf, OwnedSendHalf) {
- let shared = Arc::new(socket);
- let send = shared.clone();
- let recv = shared;
- (
- OwnedRecvHalf { inner: recv },
- OwnedSendHalf {
- inner: send,
- shutdown_on_drop: true,
- },
- )
-}
-
-/// The send half after [`split`](UnixDatagram::into_split).
-///
-/// Use [`send_to`](#method.send_to) or [`send`](#method.send) to send
-/// datagrams.
-#[derive(Debug)]
-pub struct OwnedSendHalf {
- inner: Arc<UnixDatagram>,
- shutdown_on_drop: bool,
-}
-
-/// The recv half after [`split`](UnixDatagram::into_split).
-///
-/// Use [`recv_from`](#method.recv_from) or [`recv`](#method.recv) to receive
-/// datagrams.
-#[derive(Debug)]
-pub struct OwnedRecvHalf {
- inner: Arc<UnixDatagram>,
-}
-
-/// Error indicating two halves were not from the same socket, and thus could
-/// not be `reunite`d.
-#[derive(Debug)]
-pub struct ReuniteError(pub OwnedSendHalf, pub OwnedRecvHalf);
-
-impl fmt::Display for ReuniteError {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(
- f,
- "tried to reunite halves that are not from the same socket"
- )
- }
-}
-
-impl Error for ReuniteError {}
-
-fn reunite(s: OwnedSendHalf, r: OwnedRecvHalf) -> Result<UnixDatagram, ReuniteError> {
- if Arc::ptr_eq(&s.inner, &r.inner) {
- s.forget();
- // Only two instances of the `Arc` are ever created, one for the
- // receiver and one for the sender, and those `Arc`s are never exposed
- // externally. And so when we drop one here, the other one must be the
- // only remaining one.
- Ok(Arc::try_unwrap(r.inner).expect("unixdatagram: try_unwrap failed in reunite"))
- } else {
- Err(ReuniteError(s, r))
- }
-}
-
-impl OwnedRecvHalf {
- /// Attempts to put the two "halves" of a `UnixDatagram` back together and
- /// recover the original socket. Succeeds only if the two "halves"
- /// originated from the same call to `UnixDatagram::split`.
- pub fn reunite(self, other: OwnedSendHalf) -> Result<UnixDatagram, ReuniteError> {
- reunite(other, self)
- }
-
- /// Receives data from the socket.
- pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
- poll_fn(|cx| self.inner.poll_recv_from_priv(cx, buf)).await
- }
-
- /// Receives data from the socket.
- pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- poll_fn(|cx| self.inner.poll_recv_priv(cx, buf)).await
- }
-}
-
-impl OwnedSendHalf {
- /// Attempts to put the two "halves" of a `UnixDatagram` back together and
- /// recover the original socket. Succeeds only if the two "halves"
- /// originated from the same call to `UnixDatagram::split`.
- pub fn reunite(self, other: OwnedRecvHalf) -> Result<UnixDatagram, ReuniteError> {
- reunite(self, other)
- }
-
- /// Sends data on the socket to the specified address.
- pub async fn send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize>
- where
- P: AsRef<Path> + Unpin,
- {
- poll_fn(|cx| self.inner.poll_send_to_priv(cx, buf, target.as_ref())).await
- }
-
- /// Sends data on the socket to the socket's peer.
- pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
- poll_fn(|cx| self.inner.poll_send_priv(cx, buf)).await
- }
-
- /// Destroy the send half, but don't close the stream until the recvice half
- /// is dropped. If the read half has already been dropped, this closes the
- /// stream.
- pub fn forget(mut self) {
- self.shutdown_on_drop = false;
- drop(self);
- }
-}
-
-impl Drop for OwnedSendHalf {
- fn drop(&mut self) {
- if self.shutdown_on_drop {
- let _ = self.inner.shutdown(Shutdown::Both);
- }
- }
-}
-
-impl AsRef<UnixDatagram> for OwnedSendHalf {
- fn as_ref(&self) -> &UnixDatagram {
- &self.inner
- }
-}
-
-impl AsRef<UnixDatagram> for OwnedRecvHalf {
- fn as_ref(&self) -> &UnixDatagram {
- &self.inner
- }
-}
diff --git a/tokio/src/net/unix/datagram/split.rs b/tokio/src/net/unix/datagram/split.rs
new file mode 100644
index 00000000..e42eeda8
--- /dev/null
+++ b/tokio/src/net/unix/datagram/split.rs
@@ -0,0 +1,68 @@
+//! `UnixDatagram` split support.
+//!
+//! A `UnixDatagram` can be split into a `RecvHalf` and a `SendHalf` with the
+//! `UnixDatagram::split` method.
+
+use crate::future::poll_fn;
+use crate::net::UnixDatagram;
+
+use std::io;
+use std::os::unix::net::SocketAddr;
+use std::path::Path;
+
+/// Borrowed receive half of a [`UnixDatagram`], created by [`split`].
+///
+/// [`UnixDatagram`]: UnixDatagram
+/// [`split`]: crate::net::UnixDatagram::split()
+#[derive(Debug)]
+pub struct RecvHalf<'a>(&'a UnixDatagram);
+
+/// Borrowed send half of a [`UnixDatagram`], created by [`split`].
+///
+/// [`UnixDatagram`]: UnixDatagram
+/// [`split`]: crate::net::UnixDatagram::split()
+#[derive(Debug)]
+pub struct SendHalf<'a>(&'a UnixDatagram);
+
+pub(crate) fn split(stream: &mut UnixDatagram) -> (RecvHalf<'_>, SendHalf<'_>) {
+ (RecvHalf(&*stream), SendHalf(&*stream))
+}
+
+impl RecvHalf<'_> {
+ /// Receives data from the socket.
+ pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+ poll_fn(|cx| self.0.poll_recv_from_priv(cx, buf)).await
+ }
+
+ /// Receives data from the socket.
+ pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ poll_fn(|cx| self.0.poll_recv_priv(cx, buf)).await
+ }
+}
+
+impl SendHalf<'_> {
+ /// Sends data on the socket to the specified address.
+ pub async fn send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize>
+ where
+ P: AsRef<Path> + Unpin,
+ {
+ poll_fn(|cx| self.0.poll_send_to_priv(cx, buf, target.as_ref())).await
+ }
+
+ /// Sends data on the socket to the socket's peer.
+ pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
+ poll_fn(|cx| self.0.poll_send_priv(cx, buf)).await
+ }
+}
+
+impl AsRef<UnixDatagram> for RecvHalf<'_> {
+ fn as_ref(&self) -> &UnixDatagram {
+ self.0
+ }
+}
+
+impl AsRef<UnixDatagram> for SendHalf<'_> {
+ fn as_ref(&self) -> &UnixDatagram {
+ self.0
+ }
+}
diff --git a/tokio/src/net/unix/datagram/split_owned.rs b/tokio/src/net/unix/datagram/split_owned.rs
new file mode 100644
index 00000000..699771f3
--- /dev/null
+++ b/tokio/src/net/unix/datagram/split_owned.rs
@@ -0,0 +1,148 @@
+//! `UnixDatagram` owned split support.
+//!
+//! A `UnixDatagram` can be split into an `OwnedSendHalf` and a `OwnedRecvHalf`
+//! with the `UnixDatagram::into_split` method.
+
+use crate::future::poll_fn;
+use crate::net::UnixDatagram;
+
+use std::error::Error;
+use std::net::Shutdown;
+use std::os::unix::net::SocketAddr;
+use std::path::Path;
+use std::sync::Arc;
+use std::{fmt, io};
+
+pub(crate) fn split_owned(socket: UnixDatagram) -> (OwnedRecvHalf, OwnedSendHalf) {
+ let shared = Arc::new(socket);
+ let send = shared.clone();
+ let recv = shared;
+ (
+ OwnedRecvHalf { inner: recv },
+ OwnedSendHalf {
+ inner: send,
+ shutdown_on_drop: true,
+ },
+ )
+}
+
+/// Owned send half of a [`UnixDatagram`], created by [`into_split`].
+///
+/// [`UnixDatagram`]: UnixDatagram
+/// [`into_split`]: UnixDatagram::into_split()
+#[derive(Debug)]
+pub struct OwnedSendHalf {
+ inner: Arc<UnixDatagram>,
+ shutdown_on_drop: bool,
+}
+
+/// Owned receive half of a [`UnixDatagram`], created by [`into_split`].
+///
+/// [`UnixDatagram`]: UnixDatagram
+/// [`into_split`]: UnixDatagram::into_split()
+#[derive(Debug)]
+pub struct OwnedRecvHalf {
+ inner: Arc<UnixDatagram>,
+}
+
+/// Error indicating that two halves were not from the same socket, and thus could
+/// not be `reunite`d.
+#[derive(Debug)]
+pub struct ReuniteError(pub OwnedSendHalf, pub OwnedRecvHalf);
+
+impl fmt::Display for ReuniteError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(
+ f,
+ "tried to reunite halves that are not from the same socket"
+ )
+ }
+}
+
+impl Error for ReuniteError {}
+
+fn reunite(s: OwnedSendHalf, r: OwnedRecvHalf) -> Result<UnixDatagram, ReuniteError> {
+ if Arc::ptr_eq(&s.inner, &r.inner) {
+ s.forget();
+ // Only two instances of the `Arc` are ever created, one for the
+ // receiver and one for the sender, and those `Arc`s are never exposed
+ // externally. And so when we drop one here, the other one must be the
+ // only remaining one.
+ Ok(Arc::try_unwrap(r.inner).expect("UnixDatagram: try_unwrap failed in reunite"))
+ } else {
+ Err(ReuniteError(s, r))
+ }
+}
+
+impl OwnedRecvHalf {
+ /// Attempts to put the two "halves" of a `UnixDatagram` back together and
+ /// recover the original socket. Succeeds only if the two "halves"
+ /// originated from the same call to [`into_split`].
+ ///
+ /// [`into_split`]: UnixDatagram::into_split()
+ pub fn reunite(self, other: OwnedSendHalf) -> Result<UnixDatagram, ReuniteError> {
+ reunite(other, self)
+ }
+
+ /// Receives data from the socket.
+ pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+ poll_fn(|cx| self.inner.poll_recv_from_priv(cx, buf)).await
+ }
+
+ /// Receives data from the socket.
+ pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ poll_fn(|cx| self.inner.poll_recv_priv(cx, buf)).await
+ }
+}
+
+impl OwnedSendHalf {
+ /// Attempts to put the two "halves" of a `UnixDatagram` back together and
+ /// recover the original socket. Succeeds only if the two "halves"
+ /// originated from the same call to [`into_split`].
+ ///
+ /// [`into_split`]: UnixDatagram::into_split()
+ pub fn reunite(self, other: OwnedRecvHalf) -> Result<UnixDatagram, ReuniteError> {
+ reunite(self, other)
+ }
+
+ /// Sends data on the socket to the specified address.
+ pub async fn send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize>
+ where
+ P: AsRef<Path> + Unpin,
+ {
+ poll_fn(|cx| self.inner.poll_send_to_priv(cx, buf, target.as_ref())).await
+ }
+
+ /// Sends data on the socket to the socket's peer.
+ pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
+ poll_fn(|cx| self.inner.poll_send_priv(cx, buf)).await
+ }
+
+ /// Destroy the send half, but don't close the send half of the stream
+ /// until the receive half is dropped. If the read half has already been
+ /// dropped, this closes the stream.
+ pub fn forget(mut self) {
+ self.shutdown_on_drop = false;
+ drop(self);
+ }
+}
+
+impl Drop for OwnedSendHalf {
+ fn drop(&mut self) {
+ if self.shutdown_on_drop {
+ let _ = self.inner.shutdown(Shutdown::Write);
+ }
+ }
+}
+
+impl AsRef<UnixDatagram> for OwnedSendHalf {
+ fn as_ref(&self) -> &UnixDatagram {
+ &self.inner
+ }
+}
+
+impl AsRef<UnixDatagram> for OwnedRecvHalf {
+ fn as_ref(&self) -> &UnixDatagram {
+ &self.inner
+ }
+}
diff --git a/tokio/src/net/unix/mod.rs b/tokio/src/net/unix/mod.rs
index f063b74b..b079fe04 100644
--- a/tokio/src/net/unix/mod.rs
+++ b/tokio/src/net/unix/mod.rs
@@ -1,7 +1,6 @@
//! Unix domain socket utility types
-pub(crate) mod datagram;
-pub use datagram::{OwnedRecvHalf, OwnedSendHalf, ReuniteError};
+pub mod datagram;
mod incoming;
pub use incoming::Incoming;
@@ -12,6 +11,9 @@ pub(crate) use listener::UnixListener;
mod split;
pub use split::{ReadHalf, WriteHalf};
+mod split_owned;
+pub use split_owned::{OwnedReadHalf, OwnedWriteHalf, ReuniteError};
+
pub(crate) mod stream;
pub(crate) use stream::UnixStream;
diff --git a/tokio/src/net/unix/split.rs b/tokio/src/net/unix/split.rs
index 9b9fa5ee..4fd85774 100644
--- a/tokio/src/net/unix/split.rs
+++ b/tokio/src/net/unix/split.rs
@@ -17,11 +17,32 @@ use std::net::Shutdown;
use std::pin::Pin;
use std::task::{Context, Poll};
-/// Read half of a `UnixStream`.
+/// Borrowed read half of a [`UnixStream`], created by [`split`].
+///
+/// Reading from a `ReadHalf` is usually done using the convenience methods found on the
+/// [`AsyncReadExt`] trait. Examples import this trait through [the prelude].
+///
+/// [`UnixStream`]: UnixStream
+/// [`split`]: UnixStream::split()
+/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
+/// [the prelude]: crate::prelude
#[derive(Debug)]
pub struct ReadHalf<'a>(&'a UnixStream);
-/// Write half of a `UnixStream`.
+/// Borrowed write half of a [`UnixStream`], created by [`split`].
+///
+/// Note that in the [`AsyncWrite`] implemenation of this type, [`poll_shutdown`] will
+/// shut down the UnixStream stream in the write direction.
+///
+/// Writing to an `WriteHalf` is usually done using the convenience methods found
+/// on the [`AsyncWriteExt`] trait. Examples import this trait through [the prelude].
+///
+/// [`UnixStream`]: UnixStream
+/// [`split`]: UnixStream::split()
+/// [`AsyncWrite`]: trait@crate::io::AsyncWrite
+/// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown
+/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
+/// [the prelude]: crate::prelude
#[derive(Debug)]
pub struct WriteHalf<'a>(&'a UnixStream);
diff --git a/tokio/src/net/unix/split_owned.rs b/tokio/src/net/unix/split_owned.rs
new file mode 100644
index 00000000..eb35304b
--- /dev/null
+++ b/tokio/src/net/unix/split_owned.rs
@@ -0,0 +1,187 @@
+//! `UnixStream` owned split support.
+//!
+//! A `UnixStream` can be split into an `OwnedReadHalf` and a `OwnedWriteHalf`
+//! with the `UnixStream::into_split` method. `OwnedReadHalf` implements
+//! `AsyncRead` while `OwnedWriteHalf` implements `AsyncWrite`.
+//!
+//! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized
+//! split has no associated overhead and enforces all invariants at the type
+//! level.
+
+use crate::io::{AsyncRead, AsyncWrite};
+use crate::net::UnixStream;
+
+use std::error::Error;
+use std::mem::MaybeUninit;
+use std::net::Shutdown;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{fmt, io};
+
+/// Owned read half of a [`UnixStream`], created by [`into_split`].
+///
+/// Reading from an `OwnedReadHalf` is usually done using the convenience methods found
+/// on the [`AsyncReadExt`] trait. Examples import this trait through [the prelude].
+///
+/// [`UnixStream`]: crate::net::UnixStream
+/// [`into_split`]: crate::net::UnixStream::into_split()
+/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
+/// [the prelude]: crate::prelude
+#[derive(Debug)]
+pub struct OwnedReadHalf {
+ inner: Arc<UnixStream>,
+}
+
+/// Owned write half of a [`UnixStream`], created by [`into_split`].
+///
+/// Note that in the [`AsyncWrite`] implementation of this type,
+/// [`poll_shutdown`] will shut down the stream in the write direction.
+/// Dropping the write half will also shut down the write half of the stream.
+///
+/// Writing to an `OwnedWriteHalf` is usually done using the convenience methods
+/// found on the [`AsyncWriteExt`] trait. Examples import this trait through
+/// [the prelude].
+///
+/// [`UnixStream`]: crate::net::UnixStream
+/// [`into_split`]: crate::net::UnixStream::into_split()
+/// [`AsyncWrite`]: trait@crate::io::AsyncWrite
+/// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown
+/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
+/// [the prelude]: crate::prelude
+#[derive(Debug)]
+pub struct OwnedWriteHalf {
+ inner: Arc<UnixStream>,
+ shutdown_on_drop: bool,
+}
+
+pub(crate) fn split_owned(stream: UnixStream) -> (OwnedReadHalf, OwnedWriteHalf) {
+ let arc = Arc::new(stream);
+ let read = OwnedReadHalf {
+ inner: Arc::clone(&arc),
+ };
+ let write = OwnedWriteHalf {
+ inner: arc,
+ shutdown_on_drop: true,
+ };
+ (read, write)
+}
+
+pub(crate) fn reunite(
+ read: OwnedReadHalf,
+ write: OwnedWriteHalf,
+) -> Result<UnixStream, ReuniteError> {
+ if Arc::ptr_eq(&read.inner, &write.inner) {
+ write.forget();
+ // This unwrap cannot fail as the api does not allow creating more than two Arcs,
+ // and we just dropped the other half.
+ Ok(Arc::try_unwrap(read.inner).expect("UnixStream: try_unwrap failed in reunite"))
+ } else {
+ Err(ReuniteError(read, write))
+ }
+}
+
+/// Error indicating that two halves were not from the same socket, and thus could
+/// not be reunited.
+#[derive(Debug)]
+pub struct ReuniteError(pub OwnedReadHalf, pub OwnedWriteHalf);
+
+impl fmt::Display for ReuniteError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(
+ f,
+ "tried to reunite halves that are not from the same socket"
+ )
+ }
+}
+
+impl Error for ReuniteError {}
+
+impl OwnedReadHalf {
+ /// Attempts to put the two halves of a `UnixStream` back together and
+ /// recover the original socket. Succeeds only if the two halves
+ /// originated from the same call to [`into_split`].
+ ///
+ /// [`into_split`]: crate::net::UnixStream::into_split()
+ pub fn reunite(self, other: OwnedWriteHalf) -> Result<UnixStream, ReuniteError> {
+ reunite(self, other)
+ }
+}
+
+impl AsyncRead for OwnedReadHalf {
+ unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool {
+ false
+ }
+
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ self.inner.poll_read_priv(cx, buf)
+ }
+}
+
+impl OwnedWriteHalf {
+ /// Attempts to put the two halves of a `UnixStream` back together and
+ /// recover the original socket. Succeeds only if the two halves
+ /// originated from the same call to [`into_split`].
+ ///
+ /// [`into_split`]: crate::net::UnixStream::into_split()
+ pub fn reunite(self, other: OwnedReadHalf) -> Result<UnixStream, ReuniteError> {
+ reunite(other, self)
+ }
+
+ /// Destroy the write half, but don't close the write half of the stream
+ /// until the read half is dropped. If the read half has already been
+ /// dropped, this closes the stream.
+ pub fn forget(mut self) {
+ self.shutdown_on_drop = false;
+ drop(self);
+ }
+}
+
+impl Drop for OwnedWriteHalf {
+ fn drop(&mut self) {
+ if self.shutdown_on_drop {
+ let _ = self.inner.shutdown(Shutdown::Write);
+ }
+ }
+}
+
+impl AsyncWrite for OwnedWriteHalf {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.inner.poll_write_priv(cx, buf)
+ }
+
+ #[inline]
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ // flush is a no-op
+ Poll::Ready(Ok(()))
+ }
+
+ // `poll_shutdown` on a write half shutdowns the stream in the "write" direction.
+ fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ let res = self.inner.shutdown(Shutdown::Write);
+ if res.is_ok() {
+ Pi