From 4fca1974e9d9f95fab7d723619294cb4b2dcebbb Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Fri, 24 Jul 2020 21:56:38 +0200 Subject: net: ensure that unix sockets have both split and into_split (#2687) The documentation build failed with errors such as error: `[read]` public documentation for `take` links to a private item --> tokio/src/io/util/async_read_ext.rs:1078:9 | 1078 | / /// Creates an adaptor which reads at most `limit` bytes from it. 1079 | | /// 1080 | | /// This function returns a new instance of `AsyncRead` which will read 1081 | | /// at most `limit` bytes, after which it will always return EOF ... | 1103 | | /// } 1104 | | /// ``` | |_______________^ | note: the lint level is defined here --> tokio/src/lib.rs:13:9 | 13 | #![deny(intra_doc_link_resolution_failure)] | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ = note: the link appears in this line: bytes read and future calls to [`read()`][read] may succeed. --- tokio/src/io/util/async_read_ext.rs | 10 +- tokio/src/io/util/async_write_ext.rs | 2 + tokio/src/net/mod.rs | 2 +- tokio/src/net/tcp/split.rs | 6 +- tokio/src/net/tcp/split_owned.rs | 21 +- tokio/src/net/tcp/stream.rs | 10 +- tokio/src/net/udp/split.rs | 2 +- tokio/src/net/unix/datagram.rs | 380 ----------------------------- tokio/src/net/unix/datagram/mod.rs | 8 + tokio/src/net/unix/datagram/socket.rs | 273 +++++++++++++++++++++ tokio/src/net/unix/datagram/split.rs | 68 ++++++ tokio/src/net/unix/datagram/split_owned.rs | 148 +++++++++++ tokio/src/net/unix/mod.rs | 6 +- tokio/src/net/unix/split.rs | 25 +- tokio/src/net/unix/split_owned.rs | 187 ++++++++++++++ tokio/src/net/unix/stream.rs | 26 +- tokio/src/sync/broadcast.rs | 1 + 17 files changed, 771 insertions(+), 404 deletions(-) delete mode 100644 tokio/src/net/unix/datagram.rs create mode 100644 tokio/src/net/unix/datagram/mod.rs create mode 100644 tokio/src/net/unix/datagram/socket.rs create mode 100644 tokio/src/net/unix/datagram/split.rs create mode 100644 tokio/src/net/unix/datagram/split_owned.rs create mode 100644 tokio/src/net/unix/split_owned.rs diff --git a/tokio/src/io/util/async_read_ext.rs b/tokio/src/io/util/async_read_ext.rs index e848a5d2..1ed7e6cf 100644 --- a/tokio/src/io/util/async_read_ext.rs +++ b/tokio/src/io/util/async_read_ext.rs @@ -986,10 +986,12 @@ cfg_io_util! { /// /// All bytes read from this source will be appended to the specified /// buffer `buf`. This function will continuously call [`read()`] to - /// append more data to `buf` until [`read()`][read] returns `Ok(0)`. + /// append more data to `buf` until [`read()`] returns `Ok(0)`. /// /// If successful, the total number of bytes read is returned. /// + /// [`read()`]: fn@crate::io::AsyncReadExt::read + /// /// # Errors /// /// If a read error is encountered then the `read_to_end` operation @@ -1018,7 +1020,7 @@ cfg_io_util! { /// (See also the [`tokio::fs::read`] convenience function for reading from a /// file.) /// - /// [`tokio::fs::read`]: crate::fs::read::read + /// [`tokio::fs::read`]: fn@crate::fs::read fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec) -> ReadToEnd<'a, Self> where Self: Unpin, @@ -1078,7 +1080,9 @@ cfg_io_util! { /// This function returns a new instance of `AsyncRead` which will read /// at most `limit` bytes, after which it will always return EOF /// (`Ok(0)`). Any read errors will not count towards the number of - /// bytes read and future calls to [`read()`][read] may succeed. + /// bytes read and future calls to [`read()`] may succeed. + /// + /// [`read()`]: fn@crate::io::AsyncReadExt::read /// /// # Examples /// diff --git a/tokio/src/io/util/async_write_ext.rs b/tokio/src/io/util/async_write_ext.rs index fa410974..321301e2 100644 --- a/tokio/src/io/util/async_write_ext.rs +++ b/tokio/src/io/util/async_write_ext.rs @@ -976,6 +976,8 @@ cfg_io_util! { /// no longer attempt to write to the stream. For example, the /// `TcpStream` implementation will issue a `shutdown(Write)` sys call. /// + /// [`flush`]: fn@crate::io::AsyncWriteExt::flush + /// /// # Examples /// /// ```no_run diff --git a/tokio/src/net/mod.rs b/tokio/src/net/mod.rs index eb24ac0b..da6ad1fc 100644 --- a/tokio/src/net/mod.rs +++ b/tokio/src/net/mod.rs @@ -43,7 +43,7 @@ cfg_udp! { cfg_uds! { pub mod unix; - pub use unix::datagram::UnixDatagram; + pub use unix::datagram::socket::UnixDatagram; pub use unix::listener::UnixListener; pub use unix::stream::UnixStream; } diff --git a/tokio/src/net/tcp/split.rs b/tokio/src/net/tcp/split.rs index 469056ac..0c1e359f 100644 --- a/tokio/src/net/tcp/split.rs +++ b/tokio/src/net/tcp/split.rs @@ -19,7 +19,7 @@ use std::net::Shutdown; use std::pin::Pin; use std::task::{Context, Poll}; -/// Read half of a [`TcpStream`], created by [`split`]. +/// Borrowed read half of a [`TcpStream`], created by [`split`]. /// /// Reading from a `ReadHalf` is usually done using the convenience methods found on the /// [`AsyncReadExt`] trait. Examples import this trait through [the prelude]. @@ -31,12 +31,12 @@ use std::task::{Context, Poll}; #[derive(Debug)] pub struct ReadHalf<'a>(&'a TcpStream); -/// Write half of a [`TcpStream`], created by [`split`]. +/// Borrowed write half of a [`TcpStream`], created by [`split`]. /// /// Note that in the [`AsyncWrite`] implemenation of this type, [`poll_shutdown`] will /// shut down the TCP stream in the write direction. /// -/// Writing to an `OwnedWriteHalf` is usually done using the convenience methods found +/// Writing to an `WriteHalf` is usually done using the convenience methods found /// on the [`AsyncWriteExt`] trait. Examples import this trait through [the prelude]. /// /// [`TcpStream`]: TcpStream diff --git a/tokio/src/net/tcp/split_owned.rs b/tokio/src/net/tcp/split_owned.rs index 3f6ee33f..6c2b9e69 100644 --- a/tokio/src/net/tcp/split_owned.rs +++ b/tokio/src/net/tcp/split_owned.rs @@ -37,10 +37,9 @@ pub struct OwnedReadHalf { /// Owned write half of a [`TcpStream`], created by [`into_split`]. /// -/// Note that in the [`AsyncWrite`] implemenation of this type, [`poll_shutdown`] will -/// shut down the TCP stream in the write direction. -/// -/// Dropping the write half will shutdown the write half of the TCP stream. +/// Note that in the [`AsyncWrite`] implementation of this type, [`poll_shutdown`] will +/// shut down the TCP stream in the write direction. Dropping the write half +/// will also shut down the write half of the TCP stream. /// /// Writing to an `OwnedWriteHalf` is usually done using the convenience methods found /// on the [`AsyncWriteExt`] trait. Examples import this trait through [the prelude]. @@ -77,13 +76,13 @@ pub(crate) fn reunite( write.forget(); // This unwrap cannot fail as the api does not allow creating more than two Arcs, // and we just dropped the other half. - Ok(Arc::try_unwrap(read.inner).expect("Too many handles to Arc")) + Ok(Arc::try_unwrap(read.inner).expect("TcpStream: try_unwrap failed in reunite")) } else { Err(ReuniteError(read, write)) } } -/// Error indicating two halves were not from the same socket, and thus could +/// Error indicating that two halves were not from the same socket, and thus could /// not be reunited. #[derive(Debug)] pub struct ReuniteError(pub OwnedReadHalf, pub OwnedWriteHalf); @@ -210,7 +209,9 @@ impl OwnedWriteHalf { reunite(other, self) } - /// Drop the write half, but don't issue a TCP shutdown. + /// Destroy the write half, but don't close the write half of the stream + /// until the read half is dropped. If the read half has already been + /// dropped, this closes the stream. pub fn forget(mut self) { self.shutdown_on_drop = false; drop(self); @@ -250,7 +251,11 @@ impl AsyncWrite for OwnedWriteHalf { // `poll_shutdown` on a write half shutdowns the stream in the "write" direction. fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - self.inner.shutdown(Shutdown::Write).into() + let res = self.inner.shutdown(Shutdown::Write); + if res.is_ok() { + Pin::into_inner(self).shutdown_on_drop = false; + } + res.into() } } diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index cc81e116..02b52627 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -659,6 +659,9 @@ impl TcpStream { self.io.get_ref().set_linger(dur) } + // These lifetime markers also appear in the generated documentation, and make + // it more clear that this is a *borrowed* split. + #[allow(clippy::needless_lifetimes)] /// Splits a `TcpStream` into a read half and a write half, which can be used /// to read and write the stream concurrently. /// @@ -666,7 +669,7 @@ impl TcpStream { /// moved into independently spawned tasks. /// /// [`into_split`]: TcpStream::into_split() - pub fn split(&mut self) -> (ReadHalf<'_>, WriteHalf<'_>) { + pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) { split(self) } @@ -676,10 +679,11 @@ impl TcpStream { /// Unlike [`split`], the owned halves can be moved to separate tasks, however /// this comes at the cost of a heap allocation. /// - /// **Note::** Dropping the write half will shutdown the write half of the TCP - /// stream. This is equivalent to calling `shutdown(Write)` on the `TcpStream`. + /// **Note:** Dropping the write half will shut down the write half of the TCP + /// stream. This is equivalent to calling [`shutdown(Write)`] on the `TcpStream`. /// /// [`split`]: TcpStream::split() + /// [`shutdown(Write)`]: fn@crate::net::TcpStream::shutdown pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) { split_owned(self) } diff --git a/tokio/src/net/udp/split.rs b/tokio/src/net/udp/split.rs index e8d434aa..8d87f1c7 100644 --- a/tokio/src/net/udp/split.rs +++ b/tokio/src/net/udp/split.rs @@ -42,7 +42,7 @@ pub(crate) fn split(socket: UdpSocket) -> (RecvHalf, SendHalf) { (RecvHalf(recv), SendHalf(send)) } -/// Error indicating two halves were not from the same socket, and thus could +/// Error indicating that two halves were not from the same socket, and thus could /// not be `reunite`d. #[derive(Debug)] pub struct ReuniteError(pub SendHalf, pub RecvHalf); diff --git a/tokio/src/net/unix/datagram.rs b/tokio/src/net/unix/datagram.rs deleted file mode 100644 index de450e24..00000000 --- a/tokio/src/net/unix/datagram.rs +++ /dev/null @@ -1,380 +0,0 @@ -use crate::future::poll_fn; -use crate::io::PollEvented; - -use std::convert::TryFrom; -use std::error::Error; -use std::fmt; -use std::io; -use std::net::Shutdown; -use std::os::unix::io::{AsRawFd, RawFd}; -use std::os::unix::net::{self, SocketAddr}; -use std::path::Path; -use std::sync::Arc; -use std::task::{Context, Poll}; - -cfg_uds! { - /// An I/O object representing a Unix datagram socket. - pub struct UnixDatagram { - io: PollEvented, - } -} - -impl UnixDatagram { - /// Creates a new `UnixDatagram` bound to the specified path. - pub fn bind

(path: P) -> io::Result - where - P: AsRef, - { - let socket = mio_uds::UnixDatagram::bind(path)?; - UnixDatagram::new(socket) - } - - /// Creates an unnamed pair of connected sockets. - /// - /// This function will create a pair of interconnected Unix sockets for - /// communicating back and forth between one another. Each socket will - /// be associated with the default event loop's handle. - pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> { - let (a, b) = mio_uds::UnixDatagram::pair()?; - let a = UnixDatagram::new(a)?; - let b = UnixDatagram::new(b)?; - - Ok((a, b)) - } - - /// Consumes a `UnixDatagram` in the standard library and returns a - /// nonblocking `UnixDatagram` from this crate. - /// - /// The returned datagram will be associated with the given event loop - /// specified by `handle` and is ready to perform I/O. - /// - /// # Panics - /// - /// This function panics if thread-local runtime is not set. - /// - /// The runtime is usually set implicitly when this function is called - /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. - pub fn from_std(datagram: net::UnixDatagram) -> io::Result { - let socket = mio_uds::UnixDatagram::from_datagram(datagram)?; - let io = PollEvented::new(socket)?; - Ok(UnixDatagram { io }) - } - - fn new(socket: mio_uds::UnixDatagram) -> io::Result { - let io = PollEvented::new(socket)?; - Ok(UnixDatagram { io }) - } - - /// Creates a new `UnixDatagram` which is not bound to any address. - pub fn unbound() -> io::Result { - let socket = mio_uds::UnixDatagram::unbound()?; - UnixDatagram::new(socket) - } - - /// Connects the socket to the specified address. - /// - /// The `send` method may be used to send data to the specified address. - /// `recv` and `recv_from` will only receive data from that address. - pub fn connect>(&self, path: P) -> io::Result<()> { - self.io.get_ref().connect(path) - } - - /// Sends data on the socket to the socket's peer. - pub async fn send(&mut self, buf: &[u8]) -> io::Result { - poll_fn(|cx| self.poll_send_priv(cx, buf)).await - } - - // Poll IO functions that takes `&self` are provided for the split API. - // - // They are not public because (taken from the doc of `PollEvented`): - // - // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the - // caller must ensure that there are at most two tasks that use a - // `PollEvented` instance concurrently. One for reading and one for writing. - // While violating this requirement is "safe" from a Rust memory model point - // of view, it will result in unexpected behavior in the form of lost - // notifications and tasks hanging. - pub(crate) fn poll_send_priv( - &self, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending - } - x => Poll::Ready(x), - } - } - - /// Receives data from the socket. - pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result { - poll_fn(|cx| self.poll_recv_priv(cx, buf)).await - } - - pub(crate) fn poll_recv_priv( - &self, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - match self.io.get_ref().recv(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - x => Poll::Ready(x), - } - } - - /// Sends data on the socket to the specified address. - pub async fn send_to

(&mut self, buf: &[u8], target: P) -> io::Result - where - P: AsRef + Unpin, - { - poll_fn(|cx| self.poll_send_to_priv(cx, buf, target.as_ref())).await - } - - pub(crate) fn poll_send_to_priv( - &self, - cx: &mut Context<'_>, - buf: &[u8], - target: &Path, - ) -> Poll> { - ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send_to(buf, target) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending - } - x => Poll::Ready(x), - } - } - - /// Receives data from the socket. - pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - poll_fn(|cx| self.poll_recv_from_priv(cx, buf)).await - } - - pub(crate) fn poll_recv_from_priv( - &self, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - match self.io.get_ref().recv_from(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - x => Poll::Ready(x), - } - } - - /// Returns the local address that this socket is bound to. - pub fn local_addr(&self) -> io::Result { - self.io.get_ref().local_addr() - } - - /// Returns the address of this socket's peer. - /// - /// The `connect` method will connect the socket to a peer. - pub fn peer_addr(&self) -> io::Result { - self.io.get_ref().peer_addr() - } - - /// Returns the value of the `SO_ERROR` option. - pub fn take_error(&self) -> io::Result> { - self.io.get_ref().take_error() - } - - /// Shuts down the read, write, or both halves of this connection. - /// - /// This function will cause all pending and future I/O calls on the - /// specified portions to immediately return with an appropriate value - /// (see the documentation of `Shutdown`). - pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { - self.io.get_ref().shutdown(how) - } - - /// Split a `UnixDatagram` into a receive half and a send half, which can be used - /// to receice and send the datagram concurrently. - pub fn into_split(self) -> (OwnedRecvHalf, OwnedSendHalf) { - split_owned(self) - } -} - -impl TryFrom for mio_uds::UnixDatagram { - type Error = io::Error; - - /// Consumes value, returning the mio I/O object. - /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. - /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner - fn try_from(value: UnixDatagram) -> Result { - value.io.into_inner() - } -} - -impl TryFrom for UnixDatagram { - type Error = io::Error; - - /// Consumes stream, returning the tokio I/O object. - /// - /// This is equivalent to - /// [`UnixDatagram::from_std(stream)`](UnixDatagram::from_std). - fn try_from(stream: net::UnixDatagram) -> Result { - Self::from_std(stream) - } -} - -impl fmt::Debug for UnixDatagram { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.io.get_ref().fmt(f) - } -} - -impl AsRawFd for UnixDatagram { - fn as_raw_fd(&self) -> RawFd { - self.io.get_ref().as_raw_fd() - } -} - -fn split_owned(socket: UnixDatagram) -> (OwnedRecvHalf, OwnedSendHalf) { - let shared = Arc::new(socket); - let send = shared.clone(); - let recv = shared; - ( - OwnedRecvHalf { inner: recv }, - OwnedSendHalf { - inner: send, - shutdown_on_drop: true, - }, - ) -} - -/// The send half after [`split`](UnixDatagram::into_split). -/// -/// Use [`send_to`](#method.send_to) or [`send`](#method.send) to send -/// datagrams. -#[derive(Debug)] -pub struct OwnedSendHalf { - inner: Arc, - shutdown_on_drop: bool, -} - -/// The recv half after [`split`](UnixDatagram::into_split). -/// -/// Use [`recv_from`](#method.recv_from) or [`recv`](#method.recv) to receive -/// datagrams. -#[derive(Debug)] -pub struct OwnedRecvHalf { - inner: Arc, -} - -/// Error indicating two halves were not from the same socket, and thus could -/// not be `reunite`d. -#[derive(Debug)] -pub struct ReuniteError(pub OwnedSendHalf, pub OwnedRecvHalf); - -impl fmt::Display for ReuniteError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "tried to reunite halves that are not from the same socket" - ) - } -} - -impl Error for ReuniteError {} - -fn reunite(s: OwnedSendHalf, r: OwnedRecvHalf) -> Result { - if Arc::ptr_eq(&s.inner, &r.inner) { - s.forget(); - // Only two instances of the `Arc` are ever created, one for the - // receiver and one for the sender, and those `Arc`s are never exposed - // externally. And so when we drop one here, the other one must be the - // only remaining one. - Ok(Arc::try_unwrap(r.inner).expect("unixdatagram: try_unwrap failed in reunite")) - } else { - Err(ReuniteError(s, r)) - } -} - -impl OwnedRecvHalf { - /// Attempts to put the two "halves" of a `UnixDatagram` back together and - /// recover the original socket. Succeeds only if the two "halves" - /// originated from the same call to `UnixDatagram::split`. - pub fn reunite(self, other: OwnedSendHalf) -> Result { - reunite(other, self) - } - - /// Receives data from the socket. - pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - poll_fn(|cx| self.inner.poll_recv_from_priv(cx, buf)).await - } - - /// Receives data from the socket. - pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result { - poll_fn(|cx| self.inner.poll_recv_priv(cx, buf)).await - } -} - -impl OwnedSendHalf { - /// Attempts to put the two "halves" of a `UnixDatagram` back together and - /// recover the original socket. Succeeds only if the two "halves" - /// originated from the same call to `UnixDatagram::split`. - pub fn reunite(self, other: OwnedRecvHalf) -> Result { - reunite(self, other) - } - - /// Sends data on the socket to the specified address. - pub async fn send_to

(&mut self, buf: &[u8], target: P) -> io::Result - where - P: AsRef + Unpin, - { - poll_fn(|cx| self.inner.poll_send_to_priv(cx, buf, target.as_ref())).await - } - - /// Sends data on the socket to the socket's peer. - pub async fn send(&mut self, buf: &[u8]) -> io::Result { - poll_fn(|cx| self.inner.poll_send_priv(cx, buf)).await - } - - /// Destroy the send half, but don't close the stream until the recvice half - /// is dropped. If the read half has already been dropped, this closes the - /// stream. - pub fn forget(mut self) { - self.shutdown_on_drop = false; - drop(self); - } -} - -impl Drop for OwnedSendHalf { - fn drop(&mut self) { - if self.shutdown_on_drop { - let _ = self.inner.shutdown(Shutdown::Both); - } - } -} - -impl AsRef for OwnedSendHalf { - fn as_ref(&self) -> &UnixDatagram { - &self.inner - } -} - -impl AsRef for OwnedRecvHalf { - fn as_ref(&self) -> &UnixDatagram { - &self.inner - } -} diff --git a/tokio/src/net/unix/datagram/mod.rs b/tokio/src/net/unix/datagram/mod.rs new file mode 100644 index 00000000..f484ae34 --- /dev/null +++ b/tokio/src/net/unix/datagram/mod.rs @@ -0,0 +1,8 @@ +//! Unix datagram types. + +pub(crate) mod socket; +pub(crate) mod split; +pub(crate) mod split_owned; + +pub use split::{RecvHalf, SendHalf}; +pub use split_owned::{OwnedRecvHalf, OwnedSendHalf, ReuniteError}; diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs new file mode 100644 index 00000000..2fe5654e --- /dev/null +++ b/tokio/src/net/unix/datagram/socket.rs @@ -0,0 +1,273 @@ +use crate::future::poll_fn; +use crate::io::PollEvented; +use crate::net::unix::datagram::split::{split, RecvHalf, SendHalf}; +use crate::net::unix::datagram::split_owned::{split_owned, OwnedRecvHalf, OwnedSendHalf}; + +use std::convert::TryFrom; +use std::fmt; +use std::io; +use std::net::Shutdown; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::os::unix::net::{self, SocketAddr}; +use std::path::Path; +use std::task::{Context, Poll}; + +cfg_uds! { + /// An I/O object representing a Unix datagram socket. + pub struct UnixDatagram { + io: PollEvented, + } +} + +impl UnixDatagram { + /// Creates a new `UnixDatagram` bound to the specified path. + pub fn bind

(path: P) -> io::Result + where + P: AsRef, + { + let socket = mio_uds::UnixDatagram::bind(path)?; + UnixDatagram::new(socket) + } + + /// Creates an unnamed pair of connected sockets. + /// + /// This function will create a pair of interconnected Unix sockets for + /// communicating back and forth between one another. Each socket will + /// be associated with the default event loop's handle. + pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> { + let (a, b) = mio_uds::UnixDatagram::pair()?; + let a = UnixDatagram::new(a)?; + let b = UnixDatagram::new(b)?; + + Ok((a, b)) + } + + /// Consumes a `UnixDatagram` in the standard library and returns a + /// nonblocking `UnixDatagram` from this crate. + /// + /// The returned datagram will be associated with the given event loop + /// specified by `handle` and is ready to perform I/O. + /// + /// # Panics + /// + /// This function panics if thread-local runtime is not set. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + pub fn from_std(datagram: net::UnixDatagram) -> io::Result { + let socket = mio_uds::UnixDatagram::from_datagram(datagram)?; + let io = PollEvented::new(socket)?; + Ok(UnixDatagram { io }) + } + + fn new(socket: mio_uds::UnixDatagram) -> io::Result { + let io = PollEvented::new(socket)?; + Ok(UnixDatagram { io }) + } + + /// Creates a new `UnixDatagram` which is not bound to any address. + pub fn unbound() -> io::Result { + let socket = mio_uds::UnixDatagram::unbound()?; + UnixDatagram::new(socket) + } + + /// Connects the socket to the specified address. + /// + /// The `send` method may be used to send data to the specified address. + /// `recv` and `recv_from` will only receive data from that address. + pub fn connect>(&self, path: P) -> io::Result<()> { + self.io.get_ref().connect(path) + } + + /// Sends data on the socket to the socket's peer. + pub async fn send(&mut self, buf: &[u8]) -> io::Result { + poll_fn(|cx| self.poll_send_priv(cx, buf)).await + } + + // Poll IO functions that takes `&self` are provided for the split API. + // + // They are not public because (taken from the doc of `PollEvented`): + // + // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the + // caller must ensure that there are at most two tasks that use a + // `PollEvented` instance concurrently. One for reading and one for writing. + // While violating this requirement is "safe" from a Rust memory model point + // of view, it will result in unexpected behavior in the form of lost + // notifications and tasks hanging. + pub(crate) fn poll_send_priv( + &self, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + ready!(self.io.poll_write_ready(cx))?; + + match self.io.get_ref().send(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_write_ready(cx)?; + Poll::Pending + } + x => Poll::Ready(x), + } + } + + /// Receives data from the socket. + pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result { + poll_fn(|cx| self.poll_recv_priv(cx, buf)).await + } + + pub(crate) fn poll_recv_priv( + &self, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; + + match self.io.get_ref().recv(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready(cx, mio::Ready::readable())?; + Poll::Pending + } + x => Poll::Ready(x), + } + } + + /// Sends data on the socket to the specified address. + pub async fn send_to

(&mut self, buf: &[u8], target: P) -> io::Result + where + P: AsRef + Unpin, + { + poll_fn(|cx| self.poll_send_to_priv(cx, buf, target.as_ref())).await + } + + pub(crate) fn poll_send_to_priv( + &self, + cx: &mut Context<'_>, + buf: &[u8], + target: &Path, + ) -> Poll> { + ready!(self.io.poll_write_ready(cx))?; + + match self.io.get_ref().send_to(buf, target) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_write_ready(cx)?; + Poll::Pending + } + x => Poll::Ready(x), + } + } + + /// Receives data from the socket. + pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + poll_fn(|cx| self.poll_recv_from_priv(cx, buf)).await + } + + pub(crate) fn poll_recv_from_priv( + &self, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; + + match self.io.get_ref().recv_from(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready(cx, mio::Ready::readable())?; + Poll::Pending + } + x => Poll::Ready(x), + } + } + + /// Returns the local address that this socket is bound to. + pub fn local_addr(&self) -> io::Result { + self.io.get_ref().local_addr() + } + + /// Returns the address of this socket's peer. + /// + /// The `connect` method will connect the socket to a peer. + pub fn peer_addr(&self) -> io::Result { + self.io.get_ref().peer_addr() + } + + /// Returns the value of the `SO_ERROR` option. + pub fn take_error(&self) -> io::Result> { + self.io.get_ref().take_error() + } + + /// Shuts down the read, write, or both halves of this connection. + /// + /// This function will cause all pending and future I/O calls on the + /// specified portions to immediately return with an appropriate value + /// (see the documentation of `Shutdown`). + pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { + self.io.get_ref().shutdown(how) + } + + // These lifetime markers also appear in the generated documentation, and make + // it more clear that this is a *borrowed* split. + #[allow(clippy::needless_lifetimes)] + /// Split a `UnixDatagram` into a receive half and a send half, which can be used + /// to receive and send the datagram concurrently. + /// + /// This method is more efficient than [`into_split`], but the halves cannot + /// be moved into independently spawned tasks. + /// + /// [`into_split`]: fn@crate::net::UnixDatagram::into_split + pub fn split<'a>(&'a mut self) -> (RecvHalf<'a>, SendHalf<'a>) { + split(self) + } + + /// Split a `UnixDatagram` into a receive half and a send half, which can be used + /// to receive and send the datagram concurrently. + /// + /// Unlike [`split`], the owned halves can be moved to separate tasks, + /// however this comes at the cost of a heap allocation. + /// + /// **Note:** Dropping the write half will shut down the write half of the + /// datagram. This is equivalent to calling [`shutdown(Write)`]. + /// + /// [`split`]: fn@crate::net::UnixDatagram::split + /// [`shutdown(Write)`]:fn@crate::net::UnixDatagram::shutdown + pub fn into_split(self) -> (OwnedRecvHalf, OwnedSendHalf) { + split_owned(self) + } +} + +impl TryFrom for mio_uds::UnixDatagram { + type Error = io::Error; + + /// Consumes value, returning the mio I/O object. + /// + /// See [`PollEvented::into_inner`] for more details about + /// resource deregistration that happens during the call. + /// + /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner + fn try_from(value: UnixDatagram) -> Result { + value.io.into_inner() + } +} + +impl TryFrom for UnixDatagram { + type Error = io::Error; + + /// Consumes stream, returning the tokio I/O object. + /// + /// This is equivalent to + /// [`UnixDatagram::from_std(stream)`](UnixDatagram::from_std). + fn try_from(stream: net::UnixDatagram) -> Result { + Self::from_std(stream) + } +} + +impl fmt::Debug for UnixDatagram { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.io.get_ref().fmt(f) + } +} + +impl AsRawFd for UnixDatagram { + fn as_raw_fd(&self) -> RawFd { + self.io.get_ref().as_raw_fd() + } +} diff --git a/tokio/src/net/unix/datagram/split.rs b/tokio/src/net/unix/datagram/split.rs new file mode 100644 index 00000000..e42eeda8 --- /dev/null +++ b/tokio/src/net/unix/datagram/split.rs @@ -0,0 +1,68 @@ +//! `UnixDatagram` split support. +//! +//! A `UnixDatagram` can be split into a `RecvHalf` and a `SendHalf` with the +//! `UnixDatagram::split` method. + +use crate::future::poll_fn; +use crate::net::UnixDatagram; + +use std::io; +use std::os::unix::net::SocketAddr; +use std::path::Path; + +/// Borrowed receive half of a [`UnixDatagram`], created by [`split`]. +/// +/// [`UnixDatagram`]: UnixDatagram +/// [`split`]: crate::net::UnixDatagram::split() +#[derive(Debug)] +pub struct RecvHalf<'a>(&'a UnixDatagram); + +/// Borrowed send half of a [`UnixDatagram`], created by [`split`]. +/// +/// [`UnixDatagram`]: UnixDatagram +/// [`split`]: crate::net::UnixDatagram::split() +#[derive(Debug)] +pub struct SendHalf<'a>(&'a UnixDatagram); + +pub(crate) fn split(stream: &mut UnixDatagram) -> (RecvHalf<'_>, SendHalf<'_>) { + (RecvHalf(&*stream), SendHalf(&*stream)) +} + +impl RecvHalf<'_> { + /// Receives data from the socket. + pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + poll_fn(|cx| self.0.poll_recv_from_priv(cx, buf)).await + } + + /// Receives data from the socket. + pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result { + poll_fn(|cx| self.0.poll_recv_priv(cx, buf)).await + } +} + +impl SendHalf<'_> { + /// Sends data on the socket to the specified address. + pub async fn send_to

(&mut self, buf: &[u8], target: P) -> io::Result + where + P: AsRef + Unpin, + { + poll_fn(|cx| self.0.poll_send_to_priv(cx, buf, target.as_ref())).await + } + + /// Sends data on the socket to the socket's peer. + pub async fn send(&mut self, buf: &[u8]) -> io::Result { + poll_fn(|cx| self.0.poll_send_priv(cx, buf)).await + } +} + +impl AsRef for RecvHalf<'_> { + fn as_ref(&self) -> &UnixDatagram { + self.0 + } +} + +impl AsRef for SendHalf<'_> { + fn as_ref(&self) -> &UnixDatagram { + self.0 + } +} diff --git a/tokio/src/net/unix/datagram/split_owned.rs b/tokio/src/net/unix/datagram/split_owned.rs new file mode 100644 index 00000000..699771f3 --- /dev/null +++ b/tokio/src/net/unix/datagram/split_owned.rs @@ -0,0 +1,148 @@ +//! `UnixDatagram` owned split support. +//! +//! A `UnixDatagram` can be split into an `OwnedSendHalf` and a `OwnedRecvHalf` +//! with the `UnixDatagram::into_split` method. + +use crate::future::poll_fn; +use crate::net::UnixDatagram; + +use std::error::Error; +use std::net::Shutdown; +use std::os::unix::net::SocketAddr; +use std::path::Path; +use std::sync::Arc; +use std::{fmt, io}; + +pub(crate) fn split_owned(socket: UnixDatagram) -> (OwnedRecvHalf, OwnedSendHalf) { + let shared = Arc::new(socket); + let send = shared.clone(); + let recv = shared; + ( + OwnedRecvHalf { inner: recv }, + OwnedSendHalf { + inner: send, + shutdown_on_drop: true, + }, + ) +} + +/// Owned send half of a [`UnixDatagram`], created by [`into_split`]. +/// +/// [`UnixDatagram`]: UnixDatagram +/// [`into_split`]: UnixDatagram::into_split() +#[derive(Debug)] +pub struct OwnedSendHalf { + inner: Arc, + shutdown_on_drop: bool, +} + +/// Owned receive half of a [`UnixDatagram`], created by [`into_split`]. +/// +/// [`UnixDatagram`]: UnixDatagram +/// [`into_split`]: UnixDatagram::into_split() +#[derive(Debug)] +pub struct OwnedRecvHalf { + inner: Arc, +} + +/// Error indicating that two halves were not from the same socket, and thus could +/// not be `reunite`d. +#[derive(Debug)] +pub struct ReuniteError(pub OwnedSendHalf, pub OwnedRecvHalf); + +impl fmt::Display for ReuniteError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "tried to reunite halves that are not from the same socket" + ) + } +} + +impl Error for ReuniteError {} + +fn reunite(s: OwnedSendHalf, r: OwnedRecvHalf) -> Result { + if Arc::ptr_eq(&s.inner, &r.inner) { + s.forget(); + // Only two instances of the `Arc` are ever created, one for the + // receiver and one for the sender, and those `Arc`s are never exposed + // externally. And so when we drop one here, the other one must be the + // only remaining one. + Ok(Arc::try_unwrap(r.inner).expect("UnixDatagram: try_unwrap failed in reunite")) + } else { + Err(ReuniteError(s, r)) + } +} + +impl OwnedRecvHalf { + /// Attempts to put the two "halves" of a `UnixDatagram` back together and + /// recover the original socket. Succeeds only if the two "halves" + /// originated from the same call to [`into_split`]. + /// + /// [`into_split`]: UnixDatagram::into_split() + pub fn reunite(self, other: OwnedSendHalf) -> Result { + reunite(other, self) + } + + /// Receives data from the socket. + pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + poll_fn(|cx| self.inner.poll_recv_from_priv(cx, buf)).await + } + + /// Receives data from the socket. + pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result { + poll_fn(|cx| self.inner.poll_recv_priv(cx, buf)).await + } +} + +impl OwnedSendHalf { + /// Attempts to put the two "halves" of a `UnixDatagram` back together and + /// recover the original socket. Succeeds only if the two "halves" + /// originated from the same call to [`into_split`]. + /// + /// [`into_split`]: UnixDatagram::into_split() + pub fn reunite(self, other: OwnedRecvHalf) -> Result { + reunite(self, other) + } + + /// Sends data on the socket to the specified address. + pub async fn send_to

(&mut self, buf: &[u8], target: P) -> io::Result + where + P: AsRef + Unpin, + { + poll_fn(|cx| self.inner.poll_send_to_priv(cx, buf, target.as_ref())).await + } + + /// Sends data on the socket to the socket's peer. + pub async fn send(&mut self, buf: &[u8]) -> io::Result { + poll_fn(|cx| self.inner.poll_send_priv(cx, buf)).await + } + + /// Destroy the send half, but don't close the send half of the stream + /// until the receive half is dropped. If the read half has already been + /// dropped, this closes the stream. + pub fn forget(mut self) { + self.shutdown_on_drop = false; + drop(self); + } +} + +impl Drop for OwnedSendHalf { + fn drop(&mut self) { + if self.shutdown_on_drop { + let _ = self.inner.shutdown(Shutdown::Write); + } + } +} + +impl AsRef for OwnedSendHalf { + fn as_ref(&self) -> &UnixDatagram { + &self.inner + } +} + +impl AsRef for OwnedRecvHalf { + fn as_ref(&self) -> &UnixDatagram { + &self.inner + } +} diff --git a/tokio/src/net/unix/mod.rs b/tokio/src/net/unix/mod.rs index f063b74b..b079fe04 100644 --- a/tokio/src/net/unix/mod.rs +++ b/tokio/src/net/unix/mod.rs @@ -1,7 +1,6 @@ //! Unix domain socket utility types -pub(crate) mod datagram; -pub use datagram::{OwnedRecvHalf, OwnedSendHalf, ReuniteError}; +pub mod datagram; mod incoming; pub use incoming::Incoming; @@ -12,6 +11,9 @@ pub(crate) use listener::UnixListener; mod split; pub use split::{ReadHalf, WriteHalf}; +mod split_owned; +pub use split_owned::{OwnedReadHalf, OwnedWriteHalf, ReuniteError}; + pub(crate) mod stream; pub(crate) use stream::UnixStream; diff --git a/tokio/src/net/unix/split.rs b/tokio/src/net/unix/split.rs index 9b9fa5ee..4fd85774 100644 --- a/tokio/src/net/unix/split.rs +++ b/tokio/src/net/unix/split.rs @@ -17,11 +17,32 @@ use std::net::Shutdown; use std::pin::Pin; use std::task::{Context, Poll}; -/// Read half of a `UnixStream`. +/// Borrowed read half of a [`UnixStream`], created by [`split`]. +/// +/// Reading from a `ReadHalf` is usually done using the convenience methods found on the +/// [`AsyncReadExt`] trait. Examples import this trait through [the prelude]. +/// +/// [`UnixStream`]: UnixStream +/// [`split`]: UnixStream::split() +/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt +/// [the prelude]: crate::prelude #[derive(Debug)] pub struct ReadHalf<'a>(&'a UnixStream); -/// Write half of a `UnixStream`. +/// Borrowed write half of a [`UnixStream`], created by [`split`]. +/// +/// Note that in the [`AsyncWrite`] implemenation of this type, [`poll_shutdown`] will +/// shut down the UnixStream stream in the write direction. +/// +/// Writing to an `WriteHalf` is usually done using the convenience methods found +/// on the [`AsyncWriteExt`] trait. Examples import this trait through [the prelude]. +/// +/// [`UnixStream`]: UnixStream +/// [`split`]: UnixStream::split() +/// [`AsyncWrite`]: trait@crate::io::AsyncWrite +/// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown +/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt +/// [the prelude]: crate::prelude #[derive(Debug)] pub struct WriteHalf<'a>(&'a UnixStream); diff --git a/tokio/src/net/unix/split_owned.rs b/tokio/src/net/unix/split_owned.rs new file mode 100644 index 00000000..eb35304b --- /dev/null +++ b/tokio/src/net/unix/split_owned.rs @@ -0,0 +1,187 @@ +//! `UnixStream` owned split support. +//! +//! A `UnixStream` can be split into an `OwnedReadHalf` and a `OwnedWriteHalf` +//! with the `UnixStream::into_split` method. `OwnedReadHalf` implements +//! `AsyncRead` while `OwnedWriteHalf` implements `AsyncWrite`. +//! +//! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized +//! split has no associated overhead and enforces all invariants at the type +//! level. + +use crate::io::{AsyncRead, AsyncWrite}; +use crate::net::UnixStream; + +use std::error::Error; +use std::mem::MaybeUninit; +use std::net::Shutdown; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::{fmt, io}; + +/// Owned read half of a [`UnixStream`], created by [`into_split`]. +/// +/// Reading from an `OwnedReadHalf` is usually done using the convenience methods found +/// on the [`AsyncReadExt`] trait. Examples import this trait through [the prelude]. +/// +/// [`UnixStream`]: crate::net::UnixStream +/// [`into_split`]: crate::net::UnixStream::into_split() +/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt +/// [the prelude]: crate::prelude +#[derive(Debug)] +pub struct OwnedReadHalf { + inner: Arc, +} + +/// Owned write half of a [`UnixStream`], created by [`into_split`]. +/// +/// Note that in the [`AsyncWrite`] implementation of this type, +/// [`poll_shutdown`] will shut down the stream in the write direction. +/// Dropping the write half will also shut down the write half of the stream. +/// +/// Writing to an `OwnedWriteHalf` is usually done using the convenience methods +/// found on the [`AsyncWriteExt`] trait. Examples import this trait through +/// [the prelude]. +/// +/// [`UnixStream`]: crate::net::UnixStream +/// [`into_split`]: crate::net::UnixStream::into_split() +/// [`AsyncWrite`]: trait@crate::io::AsyncWrite +/// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown +/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt +/// [the prelude]: crate::prelude +#[derive(Debug)] +pub struct OwnedWriteHalf { + inner: Arc, + shutdown_on_drop: bool, +} + +pub(crate) fn split_owned(stream: UnixStream) -> (OwnedReadHalf, OwnedWriteHalf) { + let arc = Arc::new(stream); + let read = OwnedReadHalf { + inner: Arc::clone(&arc), + }; + let write = OwnedWriteHalf { + inner: arc, + shutdown_on_drop: true, + }; + (read, write) +} + +pub(crate) fn reunite( + read: OwnedReadHalf, + write: OwnedWriteHalf, +) -> Result { + if Arc::ptr_eq(&read.inner, &write.inner) { + write.forget(); + // This unwrap cannot fail as the api does not allow creating more than two Arcs, + // and we just dropped the other half. + Ok(Arc::try_unwrap(read.inner).expect("UnixStream: try_unwrap failed in reunite")) + } else { + Err(ReuniteError(read, write)) + } +} + +/// Error indicating that two halves were not from the same socket, and thus could +/// not be reunited. +#[derive(Debug)] +pub struct ReuniteError(pub OwnedReadHalf, pub OwnedWriteHalf); + +impl fmt::Display for ReuniteError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "tried to reunite halves that are not from the same socket" + ) + } +} + +impl Error for ReuniteError {} + +impl OwnedReadHalf { + /// Attempts to put the two halves of a `UnixStream` back together and + /// recover the original socket. Succeeds only if the two halves + /// originated from the same call to [`into_split`]. + /// + /// [`into_split`]: crate::net::UnixStream::into_split() + pub fn reunite(self, other: OwnedWriteHalf) -> Result { + reunite(self, other) + } +} + +impl AsyncRead for OwnedReadHalf { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit]) -> bool { + false + } + + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + self.inner.poll_read_priv(cx, buf) + } +} + +impl OwnedWriteHalf { + /// Attempts to put the two halves of a `UnixStream` back together and + /// recover the original socket. Succeeds only if the two halves + /// originated from the same call to [`into_split`]. + /// + /// [`into_split`]: crate::net::UnixStream::into_split() + pub fn reunite(self, other: OwnedReadHalf) -> Result { + reunite(other, self) + } + + /// Destroy the write half, but don't close the write half of the stream + /// until the read half is dropped. If the read half has already been + /// dropped, this closes the stream. + pub fn forget(mut self) { + self.shutdown_on_drop = false; + drop(self); + } +} + +impl Drop for OwnedWriteHalf { + fn drop(&mut self) { + if self.shutdown_on_drop { + let _ = self.inner.shutdown(Shutdown::Write); + } + } +} + +impl AsyncWrite for OwnedWriteHalf { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.inner.poll_write_priv(cx, buf) + } + + #[inline] + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + // flush is a no-op + Poll::Ready(Ok(())) + } + + // `poll_shutdown` on a write half shutdowns the stream in the "write" direction. + fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + let res = self.inner.shutdown(Shutdown::Write); + if res.is_ok() { + Pin::into_inner(self).shutdown_on_drop = false; + } + res.into() + } +} + +impl AsRef for OwnedReadHalf { + fn as_ref(&self) -> &UnixStream { + &*self.inner + } +} + +impl AsRef for OwnedWriteHalf { + fn as_ref(&self) -> &UnixStream { + &*self.inner + } +} diff --git a/tokio/src/net/unix/stream.rs b/tokio/src/net/unix/stream.rs index beae6999..5fe242d0 100644 --- a/tokio/src/net/unix/stream.rs +++ b/tokio/src/net/unix/stream.rs @@ -1,6 +1,7 @@ use crate::future::poll_fn; use crate::io::{AsyncRead, AsyncWrite, PollEvented}; use crate::net::unix::split::{split, ReadHalf, WriteHalf}; +use crate::net::unix::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf}; use crate::net::unix::ucred::{self, UCred}; use std::convert::TryFrom; @@ -109,11 +110,34 @@ impl UnixStream { self.io.get_ref().shutdown(how) } + // These lifetime markers also appear in the generated documentation, and make + // it more clear that this is a *borrowed* split. + #[allow(clippy::needless_lifetimes)] /// Split a `UnixStream` into a read half and a write half, which can be used /// to read and write the stream concurrently. - pub fn split(&mut self) -> (ReadHalf<'_>, WriteHalf<'_>) { + /// + /// This method is more efficient than [`into_split`], but the halves cannot be + /// moved into independently spawned tasks. + /// + /// [`into_split`]: Self::into_split() + pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) { split(self) } + + /// Splits a `UnixStream` into a read half and a write half, which can be used + /// to read and write the stream concurrently. + /// + /// Unlike [`split`], the owned halves can be moved to separate tasks, however + /// this comes at the cost of a heap allocation. + /// + /// **Note:** Dropping the write half will shut down the write half of the + /// stream. This is equivalent to calling [`shutdown(Write)`] on the `UnixStream`. + /// + /// [`split`]: Self::split() + /// [`shutdown(Write)`]: fn@Self::shutdown + pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) { + split_owned(self) + } } impl TryFrom for mio_uds::UnixStream { diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 0c8716f7..cd62ffd5 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -808,6 +808,7 @@ where /// receive, `Err(TryRecvError::Empty)` is returned. /// /// [`recv`]: crate::sync::broadcast::Receiver::recv + /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv /// [`Receiver`]: crate::sync::broadcast::Receiver /// /// # Examples -- cgit v1.2.3