diff options
author | Alice Ryhl <alice@ryhl.io> | 2020-07-24 21:56:38 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-24 12:56:38 -0700 |
commit | 4fca1974e9d9f95fab7d723619294cb4b2dcebbb (patch) | |
tree | b44bd16a594adff645d8342656c3cdfcff0c54f3 /tokio/src/net/unix | |
parent | 08872c55d161cac08f4feb3e141883a47ab766cf (diff) |
net: ensure that unix sockets have both split and into_split (#2687)
The documentation build failed with errors such as
error: `[read]` public documentation for `take` links to a private item
--> tokio/src/io/util/async_read_ext.rs:1078:9
|
1078 | / /// Creates an adaptor which reads at most `limit` bytes from it.
1079 | | ///
1080 | | /// This function returns a new instance of `AsyncRead` which will read
1081 | | /// at most `limit` bytes, after which it will always return EOF
... |
1103 | | /// }
1104 | | /// ```
| |_______________^
|
note: the lint level is defined here
--> tokio/src/lib.rs:13:9
|
13 | #![deny(intra_doc_link_resolution_failure)]
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
= note: the link appears in this line:
bytes read and future calls to [`read()`][read] may succeed.
Diffstat (limited to 'tokio/src/net/unix')
-rw-r--r-- | tokio/src/net/unix/datagram/mod.rs | 8 | ||||
-rw-r--r-- | tokio/src/net/unix/datagram/socket.rs (renamed from tokio/src/net/unix/datagram.rs) | 159 | ||||
-rw-r--r-- | tokio/src/net/unix/datagram/split.rs | 68 | ||||
-rw-r--r-- | tokio/src/net/unix/datagram/split_owned.rs | 148 | ||||
-rw-r--r-- | tokio/src/net/unix/mod.rs | 6 | ||||
-rw-r--r-- | tokio/src/net/unix/split.rs | 25 | ||||
-rw-r--r-- | tokio/src/net/unix/split_owned.rs | 187 | ||||
-rw-r--r-- | tokio/src/net/unix/stream.rs | 26 |
8 files changed, 489 insertions, 138 deletions
diff --git a/tokio/src/net/unix/datagram/mod.rs b/tokio/src/net/unix/datagram/mod.rs new file mode 100644 index 00000000..f484ae34 --- /dev/null +++ b/tokio/src/net/unix/datagram/mod.rs @@ -0,0 +1,8 @@ +//! Unix datagram types. + +pub(crate) mod socket; +pub(crate) mod split; +pub(crate) mod split_owned; + +pub use split::{RecvHalf, SendHalf}; +pub use split_owned::{OwnedRecvHalf, OwnedSendHalf, ReuniteError}; diff --git a/tokio/src/net/unix/datagram.rs b/tokio/src/net/unix/datagram/socket.rs index de450e24..2fe5654e 100644 --- a/tokio/src/net/unix/datagram.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -1,15 +1,15 @@ use crate::future::poll_fn; use crate::io::PollEvented; +use crate::net::unix::datagram::split::{split, RecvHalf, SendHalf}; +use crate::net::unix::datagram::split_owned::{split_owned, OwnedRecvHalf, OwnedSendHalf}; use std::convert::TryFrom; -use std::error::Error; use std::fmt; use std::io; use std::net::Shutdown; use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::net::{self, SocketAddr}; use std::path::Path; -use std::sync::Arc; use std::task::{Context, Poll}; cfg_uds! { @@ -204,8 +204,31 @@ impl UnixDatagram { self.io.get_ref().shutdown(how) } + // These lifetime markers also appear in the generated documentation, and make + // it more clear that this is a *borrowed* split. + #[allow(clippy::needless_lifetimes)] /// Split a `UnixDatagram` into a receive half and a send half, which can be used - /// to receice and send the datagram concurrently. + /// to receive and send the datagram concurrently. + /// + /// This method is more efficient than [`into_split`], but the halves cannot + /// be moved into independently spawned tasks. + /// + /// [`into_split`]: fn@crate::net::UnixDatagram::into_split + pub fn split<'a>(&'a mut self) -> (RecvHalf<'a>, SendHalf<'a>) { + split(self) + } + + /// Split a `UnixDatagram` into a receive half and a send half, which can be used + /// to receive and send the datagram concurrently. + /// + /// Unlike [`split`], the owned halves can be moved to separate tasks, + /// however this comes at the cost of a heap allocation. + /// + /// **Note:** Dropping the write half will shut down the write half of the + /// datagram. This is equivalent to calling [`shutdown(Write)`]. + /// + /// [`split`]: fn@crate::net::UnixDatagram::split + /// [`shutdown(Write)`]:fn@crate::net::UnixDatagram::shutdown pub fn into_split(self) -> (OwnedRecvHalf, OwnedSendHalf) { split_owned(self) } @@ -248,133 +271,3 @@ impl AsRawFd for UnixDatagram { self.io.get_ref().as_raw_fd() } } - -fn split_owned(socket: UnixDatagram) -> (OwnedRecvHalf, OwnedSendHalf) { - let shared = Arc::new(socket); - let send = shared.clone(); - let recv = shared; - ( - OwnedRecvHalf { inner: recv }, - OwnedSendHalf { - inner: send, - shutdown_on_drop: true, - }, - ) -} - -/// The send half after [`split`](UnixDatagram::into_split). -/// -/// Use [`send_to`](#method.send_to) or [`send`](#method.send) to send -/// datagrams. -#[derive(Debug)] -pub struct OwnedSendHalf { - inner: Arc<UnixDatagram>, - shutdown_on_drop: bool, -} - -/// The recv half after [`split`](UnixDatagram::into_split). -/// -/// Use [`recv_from`](#method.recv_from) or [`recv`](#method.recv) to receive -/// datagrams. -#[derive(Debug)] -pub struct OwnedRecvHalf { - inner: Arc<UnixDatagram>, -} - -/// Error indicating two halves were not from the same socket, and thus could -/// not be `reunite`d. -#[derive(Debug)] -pub struct ReuniteError(pub OwnedSendHalf, pub OwnedRecvHalf); - -impl fmt::Display for ReuniteError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "tried to reunite halves that are not from the same socket" - ) - } -} - -impl Error for ReuniteError {} - -fn reunite(s: OwnedSendHalf, r: OwnedRecvHalf) -> Result<UnixDatagram, ReuniteError> { - if Arc::ptr_eq(&s.inner, &r.inner) { - s.forget(); - // Only two instances of the `Arc` are ever created, one for the - // receiver and one for the sender, and those `Arc`s are never exposed - // externally. And so when we drop one here, the other one must be the - // only remaining one. - Ok(Arc::try_unwrap(r.inner).expect("unixdatagram: try_unwrap failed in reunite")) - } else { - Err(ReuniteError(s, r)) - } -} - -impl OwnedRecvHalf { - /// Attempts to put the two "halves" of a `UnixDatagram` back together and - /// recover the original socket. Succeeds only if the two "halves" - /// originated from the same call to `UnixDatagram::split`. - pub fn reunite(self, other: OwnedSendHalf) -> Result<UnixDatagram, ReuniteError> { - reunite(other, self) - } - - /// Receives data from the socket. - pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - poll_fn(|cx| self.inner.poll_recv_from_priv(cx, buf)).await - } - - /// Receives data from the socket. - pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { - poll_fn(|cx| self.inner.poll_recv_priv(cx, buf)).await - } -} - -impl OwnedSendHalf { - /// Attempts to put the two "halves" of a `UnixDatagram` back together and - /// recover the original socket. Succeeds only if the two "halves" - /// originated from the same call to `UnixDatagram::split`. - pub fn reunite(self, other: OwnedRecvHalf) -> Result<UnixDatagram, ReuniteError> { - reunite(self, other) - } - - /// Sends data on the socket to the specified address. - pub async fn send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize> - where - P: AsRef<Path> + Unpin, - { - poll_fn(|cx| self.inner.poll_send_to_priv(cx, buf, target.as_ref())).await - } - - /// Sends data on the socket to the socket's peer. - pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { - poll_fn(|cx| self.inner.poll_send_priv(cx, buf)).await - } - - /// Destroy the send half, but don't close the stream until the recvice half - /// is dropped. If the read half has already been dropped, this closes the - /// stream. - pub fn forget(mut self) { - self.shutdown_on_drop = false; - drop(self); - } -} - -impl Drop for OwnedSendHalf { - fn drop(&mut self) { - if self.shutdown_on_drop { - let _ = self.inner.shutdown(Shutdown::Both); - } - } -} - -impl AsRef<UnixDatagram> for OwnedSendHalf { - fn as_ref(&self) -> &UnixDatagram { - &self.inner - } -} - -impl AsRef<UnixDatagram> for OwnedRecvHalf { - fn as_ref(&self) -> &UnixDatagram { - &self.inner - } -} diff --git a/tokio/src/net/unix/datagram/split.rs b/tokio/src/net/unix/datagram/split.rs new file mode 100644 index 00000000..e42eeda8 --- /dev/null +++ b/tokio/src/net/unix/datagram/split.rs @@ -0,0 +1,68 @@ +//! `UnixDatagram` split support. +//! +//! A `UnixDatagram` can be split into a `RecvHalf` and a `SendHalf` with the +//! `UnixDatagram::split` method. + +use crate::future::poll_fn; +use crate::net::UnixDatagram; + +use std::io; +use std::os::unix::net::SocketAddr; +use std::path::Path; + +/// Borrowed receive half of a [`UnixDatagram`], created by [`split`]. +/// +/// [`UnixDatagram`]: UnixDatagram +/// [`split`]: crate::net::UnixDatagram::split() +#[derive(Debug)] +pub struct RecvHalf<'a>(&'a UnixDatagram); + +/// Borrowed send half of a [`UnixDatagram`], created by [`split`]. +/// +/// [`UnixDatagram`]: UnixDatagram +/// [`split`]: crate::net::UnixDatagram::split() +#[derive(Debug)] +pub struct SendHalf<'a>(&'a UnixDatagram); + +pub(crate) fn split(stream: &mut UnixDatagram) -> (RecvHalf<'_>, SendHalf<'_>) { + (RecvHalf(&*stream), SendHalf(&*stream)) +} + +impl RecvHalf<'_> { + /// Receives data from the socket. + pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + poll_fn(|cx| self.0.poll_recv_from_priv(cx, buf)).await + } + + /// Receives data from the socket. + pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { + poll_fn(|cx| self.0.poll_recv_priv(cx, buf)).await + } +} + +impl SendHalf<'_> { + /// Sends data on the socket to the specified address. + pub async fn send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize> + where + P: AsRef<Path> + Unpin, + { + poll_fn(|cx| self.0.poll_send_to_priv(cx, buf, target.as_ref())).await + } + + /// Sends data on the socket to the socket's peer. + pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { + poll_fn(|cx| self.0.poll_send_priv(cx, buf)).await + } +} + +impl AsRef<UnixDatagram> for RecvHalf<'_> { + fn as_ref(&self) -> &UnixDatagram { + self.0 + } +} + +impl AsRef<UnixDatagram> for SendHalf<'_> { + fn as_ref(&self) -> &UnixDatagram { + self.0 + } +} diff --git a/tokio/src/net/unix/datagram/split_owned.rs b/tokio/src/net/unix/datagram/split_owned.rs new file mode 100644 index 00000000..699771f3 --- /dev/null +++ b/tokio/src/net/unix/datagram/split_owned.rs @@ -0,0 +1,148 @@ +//! `UnixDatagram` owned split support. +//! +//! A `UnixDatagram` can be split into an `OwnedSendHalf` and a `OwnedRecvHalf` +//! with the `UnixDatagram::into_split` method. + +use crate::future::poll_fn; +use crate::net::UnixDatagram; + +use std::error::Error; +use std::net::Shutdown; +use std::os::unix::net::SocketAddr; +use std::path::Path; +use std::sync::Arc; +use std::{fmt, io}; + +pub(crate) fn split_owned(socket: UnixDatagram) -> (OwnedRecvHalf, OwnedSendHalf) { + let shared = Arc::new(socket); + let send = shared.clone(); + let recv = shared; + ( + OwnedRecvHalf { inner: recv }, + OwnedSendHalf { + inner: send, + shutdown_on_drop: true, + }, + ) +} + +/// Owned send half of a [`UnixDatagram`], created by [`into_split`]. +/// +/// [`UnixDatagram`]: UnixDatagram +/// [`into_split`]: UnixDatagram::into_split() +#[derive(Debug)] +pub struct OwnedSendHalf { + inner: Arc<UnixDatagram>, + shutdown_on_drop: bool, +} + +/// Owned receive half of a [`UnixDatagram`], created by [`into_split`]. +/// +/// [`UnixDatagram`]: UnixDatagram +/// [`into_split`]: UnixDatagram::into_split() +#[derive(Debug)] +pub struct OwnedRecvHalf { + inner: Arc<UnixDatagram>, +} + +/// Error indicating that two halves were not from the same socket, and thus could +/// not be `reunite`d. +#[derive(Debug)] +pub struct ReuniteError(pub OwnedSendHalf, pub OwnedRecvHalf); + +impl fmt::Display for ReuniteError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "tried to reunite halves that are not from the same socket" + ) + } +} + +impl Error for ReuniteError {} + +fn reunite(s: OwnedSendHalf, r: OwnedRecvHalf) -> Result<UnixDatagram, ReuniteError> { + if Arc::ptr_eq(&s.inner, &r.inner) { + s.forget(); + // Only two instances of the `Arc` are ever created, one for the + // receiver and one for the sender, and those `Arc`s are never exposed + // externally. And so when we drop one here, the other one must be the + // only remaining one. + Ok(Arc::try_unwrap(r.inner).expect("UnixDatagram: try_unwrap failed in reunite")) + } else { + Err(ReuniteError(s, r)) + } +} + +impl OwnedRecvHalf { + /// Attempts to put the two "halves" of a `UnixDatagram` back together and + /// recover the original socket. Succeeds only if the two "halves" + /// originated from the same call to [`into_split`]. + /// + /// [`into_split`]: UnixDatagram::into_split() + pub fn reunite(self, other: OwnedSendHalf) -> Result<UnixDatagram, ReuniteError> { + reunite(other, self) + } + + /// Receives data from the socket. + pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + poll_fn(|cx| self.inner.poll_recv_from_priv(cx, buf)).await + } + + /// Receives data from the socket. + pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { + poll_fn(|cx| self.inner.poll_recv_priv(cx, buf)).await + } +} + +impl OwnedSendHalf { + /// Attempts to put the two "halves" of a `UnixDatagram` back together and + /// recover the original socket. Succeeds only if the two "halves" + /// originated from the same call to [`into_split`]. + /// + /// [`into_split`]: UnixDatagram::into_split() + pub fn reunite(self, other: OwnedRecvHalf) -> Result<UnixDatagram, ReuniteError> { + reunite(self, other) + } + + /// Sends data on the socket to the specified address. + pub async fn send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize> + where + P: AsRef<Path> + Unpin, + { + poll_fn(|cx| self.inner.poll_send_to_priv(cx, buf, target.as_ref())).await + } + + /// Sends data on the socket to the socket's peer. + pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { + poll_fn(|cx| self.inner.poll_send_priv(cx, buf)).await + } + + /// Destroy the send half, but don't close the send half of the stream + /// until the receive half is dropped. If the read half has already been + /// dropped, this closes the stream. + pub fn forget(mut self) { + self.shutdown_on_drop = false; + drop(self); + } +} + +impl Drop for OwnedSendHalf { + fn drop(&mut self) { + if self.shutdown_on_drop { + let _ = self.inner.shutdown(Shutdown::Write); + } + } +} + +impl AsRef<UnixDatagram> for OwnedSendHalf { + fn as_ref(&self) -> &UnixDatagram { + &self.inner + } +} + +impl AsRef<UnixDatagram> for OwnedRecvHalf { + fn as_ref(&self) -> &UnixDatagram { + &self.inner + } +} diff --git a/tokio/src/net/unix/mod.rs b/tokio/src/net/unix/mod.rs index f063b74b..b079fe04 100644 --- a/tokio/src/net/unix/mod.rs +++ b/tokio/src/net/unix/mod.rs @@ -1,7 +1,6 @@ //! Unix domain socket utility types -pub(crate) mod datagram; -pub use datagram::{OwnedRecvHalf, OwnedSendHalf, ReuniteError}; +pub mod datagram; mod incoming; pub use incoming::Incoming; @@ -12,6 +11,9 @@ pub(crate) use listener::UnixListener; mod split; pub use split::{ReadHalf, WriteHalf}; +mod split_owned; +pub use split_owned::{OwnedReadHalf, OwnedWriteHalf, ReuniteError}; + pub(crate) mod stream; pub(crate) use stream::UnixStream; diff --git a/tokio/src/net/unix/split.rs b/tokio/src/net/unix/split.rs index 9b9fa5ee..4fd85774 100644 --- a/tokio/src/net/unix/split.rs +++ b/tokio/src/net/unix/split.rs @@ -17,11 +17,32 @@ use std::net::Shutdown; use std::pin::Pin; use std::task::{Context, Poll}; -/// Read half of a `UnixStream`. +/// Borrowed read half of a [`UnixStream`], created by [`split`]. +/// +/// Reading from a `ReadHalf` is usually done using the convenience methods found on the +/// [`AsyncReadExt`] trait. Examples import this trait through [the prelude]. +/// +/// [`UnixStream`]: UnixStream +/// [`split`]: UnixStream::split() +/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt +/// [the prelude]: crate::prelude #[derive(Debug)] pub struct ReadHalf<'a>(&'a UnixStream); -/// Write half of a `UnixStream`. +/// Borrowed write half of a [`UnixStream`], created by [`split`]. +/// +/// Note that in the [`AsyncWrite`] implemenation of this type, [`poll_shutdown`] will +/// shut down the UnixStream stream in the write direction. +/// +/// Writing to an `WriteHalf` is usually done using the convenience methods found +/// on the [`AsyncWriteExt`] trait. Examples import this trait through [the prelude]. +/// +/// [`UnixStream`]: UnixStream +/// [`split`]: UnixStream::split() +/// [`AsyncWrite`]: trait@crate::io::AsyncWrite +/// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown +/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt +/// [the prelude]: crate::prelude #[derive(Debug)] pub struct WriteHalf<'a>(&'a UnixStream); diff --git a/tokio/src/net/unix/split_owned.rs b/tokio/src/net/unix/split_owned.rs new file mode 100644 index 00000000..eb35304b --- /dev/null +++ b/tokio/src/net/unix/split_owned.rs @@ -0,0 +1,187 @@ +//! `UnixStream` owned split support. +//! +//! A `UnixStream` can be split into an `OwnedReadHalf` and a `OwnedWriteHalf` +//! with the `UnixStream::into_split` method. `OwnedReadHalf` implements +//! `AsyncRead` while `OwnedWriteHalf` implements `AsyncWrite`. +//! +//! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized +//! split has no associated overhead and enforces all invariants at the type +//! level. + +use crate::io::{AsyncRead, AsyncWrite}; +use crate::net::UnixStream; + +use std::error::Error; +use std::mem::MaybeUninit; +use std::net::Shutdown; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::{fmt, io}; + +/// Owned read half of a [`UnixStream`], created by [`into_split`]. +/// +/// Reading from an `OwnedReadHalf` is usually done using the convenience methods found +/// on the [`AsyncReadExt`] trait. Examples import this trait through [the prelude]. +/// +/// [`UnixStream`]: crate::net::UnixStream +/// [`into_split`]: crate::net::UnixStream::into_split() +/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt +/// [the prelude]: crate::prelude +#[derive(Debug)] +pub struct OwnedReadHalf { + inner: Arc<UnixStream>, +} + +/// Owned write half of a [`UnixStream`], created by [`into_split`]. +/// +/// Note that in the [`AsyncWrite`] implementation of this type, +/// [`poll_shutdown`] will shut down the stream in the write direction. +/// Dropping the write half will also shut down the write half of the stream. +/// +/// Writing to an `OwnedWriteHalf` is usually done using the convenience methods +/// found on the [`AsyncWriteExt`] trait. Examples import this trait through +/// [the prelude]. +/// +/// [`UnixStream`]: crate::net::UnixStream +/// [`into_split`]: crate::net::UnixStream::into_split() +/// [`AsyncWrite`]: trait@crate::io::AsyncWrite +/// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown +/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt +/// [the prelude]: crate::prelude +#[derive(Debug)] +pub struct OwnedWriteHalf { + inner: Arc<UnixStream>, + shutdown_on_drop: bool, +} + +pub(crate) fn split_owned(stream: UnixStream) -> (OwnedReadHalf, OwnedWriteHalf) { + let arc = Arc::new(stream); + let read = OwnedReadHalf { + inner: Arc::clone(&arc), + }; + let write = OwnedWriteHalf { + inner: arc, + shutdown_on_drop: true, + }; + (read, write) +} + +pub(crate) fn reunite( + read: OwnedReadHalf, + write: OwnedWriteHalf, +) -> Result<UnixStream, ReuniteError> { + if Arc::ptr_eq(&read.inner, &write.inner) { + write.forget(); + // This unwrap cannot fail as the api does not allow creating more than two Arcs, + // and we just dropped the other half. + Ok(Arc::try_unwrap(read.inner).expect("UnixStream: try_unwrap failed in reunite")) + } else { + Err(ReuniteError(read, write)) + } +} + +/// Error indicating that two halves were not from the same socket, and thus could +/// not be reunited. +#[derive(Debug)] +pub struct ReuniteError(pub OwnedReadHalf, pub OwnedWriteHalf); + +impl fmt::Display for ReuniteError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "tried to reunite halves that are not from the same socket" + ) + } +} + +impl Error for ReuniteError {} + +impl OwnedReadHalf { + /// Attempts to put the two halves of a `UnixStream` back together and + /// recover the original socket. Succeeds only if the two halves + /// originated from the same call to [`into_split`]. + /// + /// [`into_split`]: crate::net::UnixStream::into_split() + pub fn reunite(self, other: OwnedWriteHalf) -> Result<UnixStream, ReuniteError> { + reunite(self, other) + } +} + +impl AsyncRead for OwnedReadHalf { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool { + false + } + + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + self.inner.poll_read_priv(cx, buf) + } +} + +impl OwnedWriteHalf { + /// Attempts to put the two halves of a `UnixStream` back together and + /// recover the original socket. Succeeds only if the two halves + /// originated from the same call to [`into_split`]. + /// + /// [`into_split`]: crate::net::UnixStream::into_split() + pub fn reunite(self, other: OwnedReadHalf) -> Result<UnixStream, ReuniteError> { + reunite(other, self) + } + + /// Destroy the write half, but don't close the write half of the stream + /// until the read half is dropped. If the read half has already been + /// dropped, this closes the stream. + pub fn forget(mut self) { + self.shutdown_on_drop = false; + drop(self); + } +} + +impl Drop for OwnedWriteHalf { + fn drop(&mut self) { + if self.shutdown_on_drop { + let _ = self.inner.shutdown(Shutdown::Write); + } + } +} + +impl AsyncWrite for OwnedWriteHalf { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + self.inner.poll_write_priv(cx, buf) + } + + #[inline] + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + // flush is a no-op + Poll::Ready(Ok(())) + } + + // `poll_shutdown` on a write half shutdowns the stream in the "write" direction. + fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + let res = self.inner.shutdown(Shutdown::Write); + if res.is_ok() { + Pin::into_inner(self).shutdown_on_drop = false; + } + res.into() + } +} + +impl AsRef<UnixStream> for OwnedReadHalf { + fn as_ref(&self) -> &UnixStream { + &*self.inner + } +} + +impl AsRef<UnixStream> for OwnedWriteHalf { + fn as_ref(&self) -> &UnixStream { + &*self.inner + } +} diff --git a/tokio/src/net/unix/stream.rs b/tokio/src/net/unix/stream.rs index beae6999..5fe242d0 100644 --- a/tokio/src/net/unix/stream.rs +++ b/tokio/src/net/unix/stream.rs @@ -1,6 +1,7 @@ use crate::future::poll_fn; use crate::io::{AsyncRead, AsyncWrite, PollEvented}; use crate::net::unix::split::{split, ReadHalf, WriteHalf}; +use crate::net::unix::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf}; use crate::net::unix::ucred::{self, UCred}; use std::convert::TryFrom; @@ -109,11 +110,34 @@ impl UnixStream { self.io.get_ref().shutdown(how) } + // These lifetime markers also appear in the generated documentation, and make + // it more clear that this is a *borrowed* split. + #[allow(clippy::needless_lifetimes)] /// Split a `UnixStream` into a read half and a write half, which can be used /// to read and write the stream concurrently. - pub fn split(&mut self) -> (ReadHalf<'_>, WriteHalf<'_>) { + /// + /// This method is more efficient than [`into_split`], but the halves cannot be + /// moved into independently spawned tasks. + /// + /// [`into_split`]: Self::into_split() + pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) { split(self) } + + /// Splits a `UnixStream` into a read half and a write half, which can be used + /// to read and write the stream concurrently. + /// + /// Unlike [`split`], the owned halves can be moved to separate tasks, however + /// this comes at the cost of a heap allocation. + /// + /// **Note:** Dropping the write half will shut down the write half of the + /// stream. This is equivalent to calling [`shutdown(Write)`] on the `UnixStream`. + /// + /// [`split`]: Self::split() + /// [`shutdown(Write)`]: fn@Self::shutdown + pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) { + split_owned(self) + } } impl TryFrom<UnixStream> for mio_uds::UnixStream { |