diff options
author | Alice Ryhl <alice@ryhl.io> | 2020-07-24 21:56:38 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-24 12:56:38 -0700 |
commit | 4fca1974e9d9f95fab7d723619294cb4b2dcebbb (patch) | |
tree | b44bd16a594adff645d8342656c3cdfcff0c54f3 | |
parent | 08872c55d161cac08f4feb3e141883a47ab766cf (diff) |
net: ensure that unix sockets have both split and into_split (#2687)
The documentation build failed with errors such as
error: `[read]` public documentation for `take` links to a private item
--> tokio/src/io/util/async_read_ext.rs:1078:9
|
1078 | / /// Creates an adaptor which reads at most `limit` bytes from it.
1079 | | ///
1080 | | /// This function returns a new instance of `AsyncRead` which will read
1081 | | /// at most `limit` bytes, after which it will always return EOF
... |
1103 | | /// }
1104 | | /// ```
| |_______________^
|
note: the lint level is defined here
--> tokio/src/lib.rs:13:9
|
13 | #![deny(intra_doc_link_resolution_failure)]
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
= note: the link appears in this line:
bytes read and future calls to [`read()`][read] may succeed.
-rw-r--r-- | tokio/src/io/util/async_read_ext.rs | 10 | ||||
-rw-r--r-- | tokio/src/io/util/async_write_ext.rs | 2 | ||||
-rw-r--r-- | tokio/src/net/mod.rs | 2 | ||||
-rw-r--r-- | tokio/src/net/tcp/split.rs | 6 | ||||
-rw-r--r-- | tokio/src/net/tcp/split_owned.rs | 21 | ||||
-rw-r--r-- | tokio/src/net/tcp/stream.rs | 10 | ||||
-rw-r--r-- | tokio/src/net/udp/split.rs | 2 | ||||
-rw-r--r-- | tokio/src/net/unix/datagram/mod.rs | 8 | ||||
-rw-r--r-- | tokio/src/net/unix/datagram/socket.rs (renamed from tokio/src/net/unix/datagram.rs) | 159 | ||||
-rw-r--r-- | tokio/src/net/unix/datagram/split.rs | 68 | ||||
-rw-r--r-- | tokio/src/net/unix/datagram/split_owned.rs | 148 | ||||
-rw-r--r-- | tokio/src/net/unix/mod.rs | 6 | ||||
-rw-r--r-- | tokio/src/net/unix/split.rs | 25 | ||||
-rw-r--r-- | tokio/src/net/unix/split_owned.rs | 187 | ||||
-rw-r--r-- | tokio/src/net/unix/stream.rs | 26 | ||||
-rw-r--r-- | tokio/src/sync/broadcast.rs | 1 |
16 files changed, 524 insertions, 157 deletions
diff --git a/tokio/src/io/util/async_read_ext.rs b/tokio/src/io/util/async_read_ext.rs index e848a5d2..1ed7e6cf 100644 --- a/tokio/src/io/util/async_read_ext.rs +++ b/tokio/src/io/util/async_read_ext.rs @@ -986,10 +986,12 @@ cfg_io_util! { /// /// All bytes read from this source will be appended to the specified /// buffer `buf`. This function will continuously call [`read()`] to - /// append more data to `buf` until [`read()`][read] returns `Ok(0)`. + /// append more data to `buf` until [`read()`] returns `Ok(0)`. /// /// If successful, the total number of bytes read is returned. /// + /// [`read()`]: fn@crate::io::AsyncReadExt::read + /// /// # Errors /// /// If a read error is encountered then the `read_to_end` operation @@ -1018,7 +1020,7 @@ cfg_io_util! { /// (See also the [`tokio::fs::read`] convenience function for reading from a /// file.) /// - /// [`tokio::fs::read`]: crate::fs::read::read + /// [`tokio::fs::read`]: fn@crate::fs::read fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, Self> where Self: Unpin, @@ -1078,7 +1080,9 @@ cfg_io_util! { /// This function returns a new instance of `AsyncRead` which will read /// at most `limit` bytes, after which it will always return EOF /// (`Ok(0)`). Any read errors will not count towards the number of - /// bytes read and future calls to [`read()`][read] may succeed. + /// bytes read and future calls to [`read()`] may succeed. + /// + /// [`read()`]: fn@crate::io::AsyncReadExt::read /// /// # Examples /// diff --git a/tokio/src/io/util/async_write_ext.rs b/tokio/src/io/util/async_write_ext.rs index fa410974..321301e2 100644 --- a/tokio/src/io/util/async_write_ext.rs +++ b/tokio/src/io/util/async_write_ext.rs @@ -976,6 +976,8 @@ cfg_io_util! { /// no longer attempt to write to the stream. For example, the /// `TcpStream` implementation will issue a `shutdown(Write)` sys call. /// + /// [`flush`]: fn@crate::io::AsyncWriteExt::flush + /// /// # Examples /// /// ```no_run diff --git a/tokio/src/net/mod.rs b/tokio/src/net/mod.rs index eb24ac0b..da6ad1fc 100644 --- a/tokio/src/net/mod.rs +++ b/tokio/src/net/mod.rs @@ -43,7 +43,7 @@ cfg_udp! { cfg_uds! { pub mod unix; - pub use unix::datagram::UnixDatagram; + pub use unix::datagram::socket::UnixDatagram; pub use unix::listener::UnixListener; pub use unix::stream::UnixStream; } diff --git a/tokio/src/net/tcp/split.rs b/tokio/src/net/tcp/split.rs index 469056ac..0c1e359f 100644 --- a/tokio/src/net/tcp/split.rs +++ b/tokio/src/net/tcp/split.rs @@ -19,7 +19,7 @@ use std::net::Shutdown; use std::pin::Pin; use std::task::{Context, Poll}; -/// Read half of a [`TcpStream`], created by [`split`]. +/// Borrowed read half of a [`TcpStream`], created by [`split`]. /// /// Reading from a `ReadHalf` is usually done using the convenience methods found on the /// [`AsyncReadExt`] trait. Examples import this trait through [the prelude]. @@ -31,12 +31,12 @@ use std::task::{Context, Poll}; #[derive(Debug)] pub struct ReadHalf<'a>(&'a TcpStream); -/// Write half of a [`TcpStream`], created by [`split`]. +/// Borrowed write half of a [`TcpStream`], created by [`split`]. /// /// Note that in the [`AsyncWrite`] implemenation of this type, [`poll_shutdown`] will /// shut down the TCP stream in the write direction. /// -/// Writing to an `OwnedWriteHalf` is usually done using the convenience methods found +/// Writing to an `WriteHalf` is usually done using the convenience methods found /// on the [`AsyncWriteExt`] trait. Examples import this trait through [the prelude]. /// /// [`TcpStream`]: TcpStream diff --git a/tokio/src/net/tcp/split_owned.rs b/tokio/src/net/tcp/split_owned.rs index 3f6ee33f..6c2b9e69 100644 --- a/tokio/src/net/tcp/split_owned.rs +++ b/tokio/src/net/tcp/split_owned.rs @@ -37,10 +37,9 @@ pub struct OwnedReadHalf { /// Owned write half of a [`TcpStream`], created by [`into_split`]. /// -/// Note that in the [`AsyncWrite`] implemenation of this type, [`poll_shutdown`] will -/// shut down the TCP stream in the write direction. -/// -/// Dropping the write half will shutdown the write half of the TCP stream. +/// Note that in the [`AsyncWrite`] implementation of this type, [`poll_shutdown`] will +/// shut down the TCP stream in the write direction. Dropping the write half +/// will also shut down the write half of the TCP stream. /// /// Writing to an `OwnedWriteHalf` is usually done using the convenience methods found /// on the [`AsyncWriteExt`] trait. Examples import this trait through [the prelude]. @@ -77,13 +76,13 @@ pub(crate) fn reunite( write.forget(); // This unwrap cannot fail as the api does not allow creating more than two Arcs, // and we just dropped the other half. - Ok(Arc::try_unwrap(read.inner).expect("Too many handles to Arc")) + Ok(Arc::try_unwrap(read.inner).expect("TcpStream: try_unwrap failed in reunite")) } else { Err(ReuniteError(read, write)) } } -/// Error indicating two halves were not from the same socket, and thus could +/// Error indicating that two halves were not from the same socket, and thus could /// not be reunited. #[derive(Debug)] pub struct ReuniteError(pub OwnedReadHalf, pub OwnedWriteHalf); @@ -210,7 +209,9 @@ impl OwnedWriteHalf { reunite(other, self) } - /// Drop the write half, but don't issue a TCP shutdown. + /// Destroy the write half, but don't close the write half of the stream + /// until the read half is dropped. If the read half has already been + /// dropped, this closes the stream. pub fn forget(mut self) { self.shutdown_on_drop = false; drop(self); @@ -250,7 +251,11 @@ impl AsyncWrite for OwnedWriteHalf { // `poll_shutdown` on a write half shutdowns the stream in the "write" direction. fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { - self.inner.shutdown(Shutdown::Write).into() + let res = self.inner.shutdown(Shutdown::Write); + if res.is_ok() { + Pin::into_inner(self).shutdown_on_drop = false; + } + res.into() } } diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index cc81e116..02b52627 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -659,6 +659,9 @@ impl TcpStream { self.io.get_ref().set_linger(dur) } + // These lifetime markers also appear in the generated documentation, and make + // it more clear that this is a *borrowed* split. + #[allow(clippy::needless_lifetimes)] /// Splits a `TcpStream` into a read half and a write half, which can be used /// to read and write the stream concurrently. /// @@ -666,7 +669,7 @@ impl TcpStream { /// moved into independently spawned tasks. /// /// [`into_split`]: TcpStream::into_split() - pub fn split(&mut self) -> (ReadHalf<'_>, WriteHalf<'_>) { + pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) { split(self) } @@ -676,10 +679,11 @@ impl TcpStream { /// Unlike [`split`], the owned halves can be moved to separate tasks, however /// this comes at the cost of a heap allocation. /// - /// **Note::** Dropping the write half will shutdown the write half of the TCP - /// stream. This is equivalent to calling `shutdown(Write)` on the `TcpStream`. + /// **Note:** Dropping the write half will shut down the write half of the TCP + /// stream. This is equivalent to calling [`shutdown(Write)`] on the `TcpStream`. /// /// [`split`]: TcpStream::split() + /// [`shutdown(Write)`]: fn@crate::net::TcpStream::shutdown pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) { split_owned(self) } diff --git a/tokio/src/net/udp/split.rs b/tokio/src/net/udp/split.rs index e8d434aa..8d87f1c7 100644 --- a/tokio/src/net/udp/split.rs +++ b/tokio/src/net/udp/split.rs @@ -42,7 +42,7 @@ pub(crate) fn split(socket: UdpSocket) -> (RecvHalf, SendHalf) { (RecvHalf(recv), SendHalf(send)) } -/// Error indicating two halves were not from the same socket, and thus could +/// Error indicating that two halves were not from the same socket, and thus could /// not be `reunite`d. #[derive(Debug)] pub struct ReuniteError(pub SendHalf, pub RecvHalf); diff --git a/tokio/src/net/unix/datagram/mod.rs b/tokio/src/net/unix/datagram/mod.rs new file mode 100644 index 00000000..f484ae34 --- /dev/null +++ b/tokio/src/net/unix/datagram/mod.rs @@ -0,0 +1,8 @@ +//! Unix datagram types. + +pub(crate) mod socket; +pub(crate) mod split; +pub(crate) mod split_owned; + +pub use split::{RecvHalf, SendHalf}; +pub use split_owned::{OwnedRecvHalf, OwnedSendHalf, ReuniteError}; diff --git a/tokio/src/net/unix/datagram.rs b/tokio/src/net/unix/datagram/socket.rs index de450e24..2fe5654e 100644 --- a/tokio/src/net/unix/datagram.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -1,15 +1,15 @@ use crate::future::poll_fn; use crate::io::PollEvented; +use crate::net::unix::datagram::split::{split, RecvHalf, SendHalf}; +use crate::net::unix::datagram::split_owned::{split_owned, OwnedRecvHalf, OwnedSendHalf}; use std::convert::TryFrom; -use std::error::Error; use std::fmt; use std::io; use std::net::Shutdown; use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::net::{self, SocketAddr}; use std::path::Path; -use std::sync::Arc; use std::task::{Context, Poll}; cfg_uds! { @@ -204,8 +204,31 @@ impl UnixDatagram { self.io.get_ref().shutdown(how) } + // These lifetime markers also appear in the generated documentation, and make + // it more clear that this is a *borrowed* split. + #[allow(clippy::needless_lifetimes)] /// Split a `UnixDatagram` into a receive half and a send half, which can be used - /// to receice and send the datagram concurrently. + /// to receive and send the datagram concurrently. + /// + /// This method is more efficient than [`into_split`], but the halves cannot + /// be moved into independently spawned tasks. + /// + /// [`into_split`]: fn@crate::net::UnixDatagram::into_split + pub fn split<'a>(&'a mut self) -> (RecvHalf<'a>, SendHalf<'a>) { + split(self) + } + + /// Split a `UnixDatagram` into a receive half and a send half, which can be used + /// to receive and send the datagram concurrently. + /// + /// Unlike [`split`], the owned halves can be moved to separate tasks, + /// however this comes at the cost of a heap allocation. + /// + /// **Note:** Dropping the write half will shut down the write half of the + /// datagram. This is equivalent to calling [`shutdown(Write)`]. + /// + /// [`split`]: fn@crate::net::UnixDatagram::split + /// [`shutdown(Write)`]:fn@crate::net::UnixDatagram::shutdown pub fn into_split(self) -> (OwnedRecvHalf, OwnedSendHalf) { split_owned(self) } @@ -248,133 +271,3 @@ impl AsRawFd for UnixDatagram { self.io.get_ref().as_raw_fd() } } - -fn split_owned(socket: UnixDatagram) -> (OwnedRecvHalf, OwnedSendHalf) { - let shared = Arc::new(socket); - let send = shared.clone(); - let recv = shared; - ( - OwnedRecvHalf { inner: recv }, - OwnedSendHalf { - inner: send, - shutdown_on_drop: true, - }, - ) -} - -/// The send half after [`split`](UnixDatagram::into_split). -/// -/// Use [`send_to`](#method.send_to) or [`send`](#method.send) to send -/// datagrams. -#[derive(Debug)] -pub struct OwnedSendHalf { - inner: Arc<UnixDatagram>, - shutdown_on_drop: bool, -} - -/// The recv half after [`split`](UnixDatagram::into_split). -/// -/// Use [`recv_from`](#method.recv_from) or [`recv`](#method.recv) to receive -/// datagrams. -#[derive(Debug)] -pub struct OwnedRecvHalf { - inner: Arc<UnixDatagram>, -} - -/// Error indicating two halves were not from the same socket, and thus could -/// not be `reunite`d. -#[derive(Debug)] -pub struct ReuniteError(pub OwnedSendHalf, pub OwnedRecvHalf); - -impl fmt::Display for ReuniteError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "tried to reunite halves that are not from the same socket" - ) - } -} - -impl Error for ReuniteError {} - -fn reunite(s: OwnedSendHalf, r: OwnedRecvHalf) -> Result<UnixDatagram, ReuniteError> { - if Arc::ptr_eq(&s.inner, &r.inner) { - s.forget(); - // Only two instances of the `Arc` are ever created, one for the - // receiver and one for the sender, and those `Arc`s are never exposed - // externally. And so when we drop one here, the other one must be the - // only remaining one. - Ok(Arc::try_unwrap(r.inner).expect("unixdatagram: try_unwrap failed in reunite")) - } else { - Err(ReuniteError(s, r)) - } -} - -impl OwnedRecvHalf { - /// Attempts to put the two "halves" of a `UnixDatagram` back together and - /// recover the original socket. Succeeds only if the two "halves" - /// originated from the same call to `UnixDatagram::split`. - pub fn reunite(self, other: OwnedSendHalf) -> Result<UnixDatagram, ReuniteError> { - reunite(other, self) - } - - /// Receives data from the socket. - pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - poll_fn(|cx| self.inner.poll_recv_from_priv(cx, buf)).await - } - - /// Receives data from the socket. - pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { - poll_fn(|cx| self.inner.poll_recv_priv(cx, buf)).await - } -} - -impl OwnedSendHalf { - /// Attempts to put the two "halves" of a `UnixDatagram` back together and - /// recover the original socket. Succeeds only if the two "halves" - /// originated from the same call to `UnixDatagram::split`. - pub fn reunite(self, other: OwnedRecvHalf) -> Result<UnixDatagram, ReuniteError> { - reunite(self, other) - } - - /// Sends data on the socket to the specified address. - pub async fn send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize> - where - P: AsRef<Path> + Unpin, - { - poll_fn(|cx| self.inner.poll_send_to_priv(cx, buf, target.as_ref())).await - } - - /// Sends data on the socket to the socket's peer. - pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { - poll_fn(|cx| self.inner.poll_send_priv(cx, buf)).await - } - - /// Destroy the send half, but don't close the stream until the recvice half - /// is dropped. If the read half has already been dropped, this closes the - /// stream. - pub fn forget(mut self) { - self.shutdown_on_drop = false; - drop(self); - } -} - -impl Drop for OwnedSendHalf { - fn drop(&mut self) { - if self.shutdown_on_drop { - let _ = self.inner.shutdown(Shutdown::Both); - } - } -} - -impl AsRef<UnixDatagram> for OwnedSendHalf { - fn as_ref(&self) -> &UnixDatagram { - &self.inner - } -} - -impl AsRef<UnixDatagram> for OwnedRecvHalf { - fn as_ref(&self) -> &UnixDatagram { - &self.inner - } -} diff --git a/tokio/src/net/unix/datagram/split.rs b/tokio/src/net/unix/datagram/split.rs new file mode 100644 index 00000000..e42eeda8 --- /dev/null +++ b/tokio/src/net/unix/datagram/split.rs @@ -0,0 +1,68 @@ +//! `UnixDatagram` split support. +//! +//! A `UnixDatagram` can be split into a `RecvHalf` and a `SendHalf` with the +//! `UnixDatagram::split` method. + +use crate::future::poll_fn; +use crate::net::UnixDatagram; + +use std::io; +use std::os::unix::net::SocketAddr; +use std::path::Path; + +/// Borrowed receive half of a [`UnixDatagram`], created by [`split`]. +/// +/// [`UnixDatagram`]: UnixDatagram +/// [`split`]: crate::net::UnixDatagram::split() +#[derive(Debug)] +pub struct RecvHalf<'a>(&'a UnixDatagram); + +/// Borrowed send half of a [`UnixDatagram`], created by [`split`]. +/// +/// [`UnixDatagram`]: UnixDatagram +/// [`split`]: crate::net::UnixDatagram::split() +#[derive(Debug)] +pub struct SendHalf<'a>(&'a UnixDatagram); + +pub(crate) fn split(stream: &mut UnixDatagram) -> (RecvHalf<'_>, SendHalf<'_>) { + (RecvHalf(&*stream), SendHalf(&*stream)) +} + +impl RecvHalf<'_> { + /// Receives data from the socket. + pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + poll_fn(|cx| self.0.poll_recv_from_priv(cx, buf)).await + } + + /// Receives data from the socket. + pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { + poll_fn(|cx| self.0.poll_recv_priv(cx, buf)).await + } +} + +impl SendHalf<'_> { + /// Sends data on the socket to the specified address. + pub async fn send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize> + where + P: AsRef<Path> + Unpin, + { + poll_fn(|cx| self.0.poll_send_to_priv(cx, buf, target.as_ref())).await + } + + /// Sends data on the socket to the socket's peer. + pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { + poll_fn(|cx| self.0.poll_send_priv(cx, buf)).await + } +} + +impl AsRef<UnixDatagram> for RecvHalf<'_> { + fn as_ref(&self) -> &UnixDatagram { + self.0 + } +} + +impl AsRef<UnixDatagram> for SendHalf<'_> { + fn as_ref(&self) -> &UnixDatagram { + self.0 + } +} diff --git a/tokio/src/net/unix/datagram/split_owned.rs b/tokio/src/net/unix/datagram/split_owned.rs new file mode 100644 index 00000000..699771f3 --- /dev/null +++ b/tokio/src/net/unix/datagram/split_owned.rs @@ -0,0 +1,148 @@ +//! `UnixDatagram` owned split support. +//! +//! A `UnixDatagram` can be split into an `OwnedSendHalf` and a `OwnedRecvHalf` +//! with the `UnixDatagram::into_split` method. + +use crate::future::poll_fn; +use crate::net::UnixDatagram; + +use std::error::Error; +use std::net::Shutdown; +use std::os::unix::net::SocketAddr; +use std::path::Path; +use std::sync::Arc; +use std::{fmt, io}; + +pub(crate) fn split_owned(socket: UnixDatagram) -> (OwnedRecvHalf, OwnedSendHalf) { + let shared = Arc::new(socket); + let send = shared.clone(); + let recv = shared; + ( + OwnedRecvHalf { inner: recv }, + OwnedSendHalf { + inner: send, + shutdown_on_drop: true, + }, + ) +} + +/// Owned send half of a [`UnixDatagram`], created by [`into_split`]. +/// +/// [`UnixDatagram`]: UnixDatagram +/// [`into_split`]: UnixDatagram::into_split() +#[derive(Debug)] +pub struct OwnedSendHalf { + inner: Arc<UnixDatagram>, + shutdown_on_drop: bool, +} + +/// Owned receive half of a [`UnixDatagram`], created by [`into_split`]. +/// +/// [`UnixDatagram`]: UnixDatagram +/// [`into_split`]: UnixDatagram::into_split() +#[derive(Debug)] +pub struct OwnedRecvHalf { + inner: Arc<UnixDatagram>, +} + +/// Error indicating that two halves were not from the same socket, and thus could +/// not be `reunite`d. +#[derive(Debug)] +pub struct ReuniteError(pub OwnedSendHalf, pub OwnedRecvHalf); + +impl fmt::Display for ReuniteError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "tried to reunite halves that are not from the same socket" + ) + } +} + +impl Error for ReuniteError {} + +fn reunite(s: OwnedSendHalf, r: OwnedRecvHalf) -> Result<UnixDatagram, ReuniteError> { + if Arc::ptr_eq(&s.inner, &r.inner) { + s.forget(); + // Only two instances of the `Arc` are ever created, one for the + // receiver and one for the sender, and those `Arc`s are never exposed + // externally. And so when we drop one here, the other one must be the + // only remaining one. + Ok(Arc::try_unwrap(r.inner).expect("UnixDatagram: try_unwrap failed in reunite")) + } else { + Err(ReuniteError(s, r)) + } +} + +impl OwnedRecvHalf { + /// Attempts to put the two "halves" of a `UnixDatagram` back together and + /// recover the original socket. Succeeds only if the two "halves" + /// originated from the same call to [`into_split`]. + /// + /// [`into_split`]: UnixDatagram::into_split() + pub fn reunite(self, other: OwnedSendHalf) -> Result<UnixDatagram, ReuniteError> { + reunite(other, self) + } + + /// Receives data from the socket. + pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + poll_fn(|cx| self.inner.poll_recv_from_priv(cx, buf)).await + } + + /// Receives data from the socket. + pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { + poll_fn(|cx| self.inner.poll_recv_priv(cx, buf)).await + } +} + +impl OwnedSendHalf { + /// Attempts to put the two "halves" of a `UnixDatagram` back together and + /// recover the original socket. Succeeds only if the two "halves" + /// originated from the same call to [`into_split`]. + /// + /// [`into_split`]: UnixDatagram::into_split() + pub fn reunite(self, other: OwnedRecvHalf) -> Result<UnixDatagram, ReuniteError> { + reunite(self, other) + } + + /// Sends data on the socket to the specified address. + pub async fn send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize> + where + P: AsRef<Path> + Unpin, + { + poll_fn(|cx| self.inner.poll_send_to_priv(cx, buf, target.as_ref())).await + } + + /// Sends data on the socket to the socket's peer. + pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { + poll_fn(|cx| self.inner.poll_send_priv(cx, buf)).await + } + + /// Destroy the send half, but don't close the send half of the stream + /// until the receive half is dropped. If the read half has already been + /// dropped, this closes the stream. + pub fn forget(mut self) { + self.shutdown_on_drop = false; + drop(self); + } +} + +impl Drop for OwnedSendHalf { + fn drop(&mut self) { + if self.shutdown_on_drop { + let _ = self.inner.shutdown(Shutdown::Write); + } + } +} + +impl AsRef<UnixDatagram> for OwnedSendHalf { + fn as_ref(&self) -> &UnixDatagram { + &self.inner + } +} + +impl AsRef<UnixDatagram> for OwnedRecvHalf { + fn as_ref(&self) -> &UnixDatagram { + &self.inner + } +} diff --git a/tokio/src/net/unix/mod.rs b/tokio/src/net/unix/mod.rs index f063b74b..b079fe04 100644 --- a/tokio/src/net/unix/mod.rs +++ b/tokio/src/net/unix/mod.rs @@ -1,7 +1,6 @@ //! Unix domain socket utility types -pub(crate) mod datagram; -pub use datagram::{OwnedRecvHalf, OwnedSendHalf, ReuniteError}; +pub mod datagram; mod incoming; pub use incoming::Incoming; @@ -12,6 +11,9 @@ pub(crate) use listener::UnixListener; mod split; pub use split::{ReadHalf, WriteHalf}; +mod split_owned; +pub use split_owned::{OwnedReadHalf, OwnedWriteHalf, ReuniteError}; + pub(crate) mod stream; pub(crate) use stream::UnixStream; diff --git a/tokio/src/net/unix/split.rs b/tokio/src/net/unix/split.rs index 9b9fa5ee..4fd85774 100644 --- a/tokio/src/net/unix/split.rs +++ b/tokio/src/net/unix/split.rs @@ -17,11 +17,32 @@ use std::net::Shutdown; use std::pin::Pin; use std::task::{Context, Poll}; -/// Read half of a `UnixStream`. +/// Borrowed read half of a [`UnixStream`], created by [`split`]. +/// +/// Reading from a `ReadHalf` is usually done using the convenience methods found on the +/// [`AsyncReadExt`] trait. Examples import this trait through [the prelude]. +/// +/// [`UnixStream`]: UnixStream +/// [`split`]: UnixStream::split() +/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt +/// [the prelude]: crate::prelude #[derive(Debug)] pub struct ReadHalf<'a>(&'a UnixStream); -/// Write half of a `UnixStream`. +/// Borrowed write half of a [`UnixStream`], created by [`split`]. +/// +/// Note that in the [`AsyncWrite`] implemenation of this type, [`poll_shutdown`] will +/// shut down the UnixStream stream in the write direction. +/// +/// Writing to an `WriteHalf` is usually done using the convenience methods found +/// on the [`AsyncWriteExt`] trait. Examples import this trait through [the prelude]. +/// +/// [`UnixStream`]: UnixStream +/// [`split`]: UnixStream::split() +/// [`AsyncWrite`]: trait@crate::io::AsyncWrite +/// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown +/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt +/// [the prelude]: crate::prelude #[derive(Debug)] pub struct WriteHalf<'a>(&'a UnixStream); diff --git a/tokio/src/net/unix/split_owned.rs b/tokio/src/net/unix/split_owned.rs new file mode 100644 index 00000000..eb35304b --- /dev/null +++ b/tokio/src/net/unix/split_owned.rs @@ -0,0 +1,187 @@ +//! `UnixStream` owned split support. +//! +//! A `UnixStream` can be split into an `OwnedReadHalf` and a `OwnedWriteHalf` +//! with the `UnixStream::into_split` method. `OwnedReadHalf` implements +//! `AsyncRead` while `OwnedWriteHalf` implements `AsyncWrite`. +//! +//! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized +//! split has no associated overhead and enforces all invariants at the type +//! level. + +use crate::io::{AsyncRead, AsyncWrite}; +use crate::net::UnixStream; + +use std::error::Error; +use std::mem::MaybeUninit; +use std::net::Shutdown; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::{fmt, io}; + +/// Owned read half of a [`UnixStream`], created by [`into_split`]. +/// +/// Reading from an `OwnedReadHalf` is usually done using the convenience methods found +/// on the [`AsyncReadExt`] trait. Examples import this trait through [the prelude]. +/// +/// [`UnixStream`]: crate::net::UnixStream +/// [`into_split`]: crate::net::UnixStream::into_split() +/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt +/// [the prelude]: crate::prelude +#[derive(Debug)] +pub struct OwnedReadHalf { + inner: Arc<UnixStream>, +} + +/// Owned write half of a [`UnixStream`], created by [`into_split`]. +/// +/// Note that in the [`AsyncWrite`] implementation of this type, +/// [`poll_shutdown`] will shut down the stream in the write direction. +/// Dropping the write half will also shut down the write half of the stream. +/// +/// Writing to an `OwnedWriteHalf` is usually done using the convenience methods +/// found on the [`AsyncWriteExt`] trait. Examples import this trait through +/// [the prelude]. +/// +/// [`UnixStream`]: crate::net::UnixStream +/// [`into_split`]: crate::net::UnixStream::into_split() +/// [`AsyncWrite`]: trait@crate::io::AsyncWrite +/// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown +/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt +/// [the prelude]: crate::prelude +#[derive(Debug)] +pub struct OwnedWriteHalf { + inner: Arc<UnixStream>, + shutdown_on_drop: bool, +} + +pub(crate) fn split_owned(stream: UnixStream) -> (OwnedReadHalf, OwnedWriteHalf) { + let arc = Arc::new(stream); + let read = OwnedReadHalf { + inner: Arc::clone(&arc), + }; + let write = OwnedWriteHalf { + inner: arc, + shutdown_on_drop: true, + }; + (read, write) +} + +pub(crate) fn reunite( + read: OwnedReadHalf, + write: OwnedWriteHalf, +) -> Result<UnixStream, ReuniteError> { + if Arc::ptr_eq(&read.inner, &write.inner) { + write.forget(); + // This unwrap cannot fail as the api does not allow creating more than two Arcs, + // and we just dropped the other half. + Ok(Arc::try_unwrap(read.inner).expect("UnixStream: try_unwrap failed in reunite")) + } else { + Err(ReuniteError(read, write)) + } +} + +/// Error indicating that two halves were not from the same socket, and thus could +/// not be reunited. +#[derive(Debug)] +pub struct ReuniteError(pub OwnedReadHalf, pub OwnedWriteHalf); + +impl fmt::Display for ReuniteError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "tried to reunite halves that are not from the same socket" + ) + } +} + +impl Error for ReuniteError {} + +impl OwnedReadHalf { + /// Attempts to put the two halves of a `UnixStream` back together and + /// recover the original socket. Succeeds only if the two halves + /// originated from the same call to [`into_split`]. + /// + /// [`into_split`]: crate::net::UnixStream::into_split() + pub fn reunite(self, other: OwnedWriteHalf) -> Result<UnixStream, ReuniteError> { + reunite(self, other) + } +} + +impl AsyncRead for OwnedReadHalf { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool { + false + } + + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + self.inner.poll_read_priv(cx, buf) + } +} + +impl OwnedWriteHalf { + /// Attempts to put the two halves of a `UnixStream` back together and + /// recover the original socket. Succeeds only if the two halves + /// originated from the same call to [`into_split`]. + /// + /// [`into_split`]: crate::net::UnixStream::into_split() + pub fn reunite(self, other: OwnedReadHalf) -> Result<UnixStream, ReuniteError> { + reunite(other, self) + } + + /// Destroy the write half, but don't close the write half of the stream + /// until the read half is dropped. If the read half has already been + /// dropped, this closes the stream. + pub fn forget(mut self) { + self.shutdown_on_drop = false; + drop(self); + } +} + +impl Drop for OwnedWriteHalf { + fn drop(&mut self) { + if self.shutdown_on_drop { + let _ = self.inner.shutdown(Shutdown::Write); + } + } +} + +impl AsyncWrite for OwnedWriteHalf { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + self.inner.poll_write_priv(cx, buf) + } + + #[inline] + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + // flush is a no-op + Poll::Ready(Ok(())) + } + + // `poll_shutdown` on a write half shutdowns the stream in the "write" direction. + fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + let res = self.inner.shutdown(Shutdown::Write); + if res.is_ok() { + Pi |