diff options
author | Tomasz Miąsko <tomaszmiasko@gmail.com> | 2019-08-09 21:50:18 +0200 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2019-08-09 12:50:18 -0700 |
commit | 756606a58be2ad29c33752873d2c5c3c2f7a1d71 (patch) | |
tree | d22556b688cd1f0df3efe4b59d0bbab8864c635b | |
parent | e3b4c99a33d86f9825721bc3bc7f298adef094d3 (diff) |
uds: implement split and split_mut for UnixStream (#1395)
This mirrors split API available in TcpStream.
-rw-r--r-- | tokio-uds/src/lib.rs | 1 | ||||
-rw-r--r-- | tokio-uds/src/split.rs | 180 | ||||
-rw-r--r-- | tokio-uds/src/stream.rs | 22 | ||||
-rw-r--r-- | tokio-uds/tests/split.rs | 43 | ||||
-rw-r--r-- | tokio/src/net.rs | 2 |
5 files changed, 247 insertions, 1 deletions
diff --git a/tokio-uds/src/lib.rs b/tokio-uds/src/lib.rs index 2f8dc7cf..a2824241 100644 --- a/tokio-uds/src/lib.rs +++ b/tokio-uds/src/lib.rs @@ -12,6 +12,7 @@ mod datagram; // mod frame; mod incoming; mod listener; +pub mod split; mod stream; mod ucred; diff --git a/tokio-uds/src/split.rs b/tokio-uds/src/split.rs new file mode 100644 index 00000000..19f2458d --- /dev/null +++ b/tokio-uds/src/split.rs @@ -0,0 +1,180 @@ +//! `UnixStream` split support. +//! +//! A `UnixStream` can be split into a read half and a write half with `UnixStream::split` +//! and `UnixStream::split_mut` methods. The read half implements `AsyncRead` while +//! the write half implements `AsyncWrite`. The two halves can be used concurrently. +//! +//! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized +//! split gives read and write halves that are faster and smaller, because they +//! do not use locks. They also provide access to the underlying `UnixStream` +//! after split, implementing `AsRef<UnixStream>`. This allows you to call +//! `UnixStream` methods that takes `&self`, e.g., to get local and peer +//! addresses, to get and set socket options, and to shutdown the sockets. + +use super::UnixStream; +use bytes::{Buf, BufMut}; +use std::io; +use std::net::Shutdown; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use tokio_io::{AsyncRead, AsyncWrite}; + +/// Read half of a `UnixStream`. +#[derive(Debug)] +pub struct UnixStreamReadHalf(Arc<UnixStream>); + +/// Write half of a `UnixStream`. +/// +/// Note that in the `AsyncWrite` implementation of `UnixStreamWriteHalf`, +/// `poll_shutdown` actually shuts down the stream in the write direction. +#[derive(Debug)] +pub struct UnixStreamWriteHalf(Arc<UnixStream>); + +/// Read half of a `UnixStream`. +#[derive(Debug)] +pub struct UnixStreamReadHalfMut<'a>(&'a UnixStream); + +/// Write half of a `UnixStream`. +/// +/// Note that in the `AsyncWrite` implementation of `UnixStreamWriteHalfMut`, +/// `poll_shutdown` actually shuts down the stream in the write direction. +#[derive(Debug)] +pub struct UnixStreamWriteHalfMut<'a>(&'a UnixStream); + +pub(crate) fn split(stream: UnixStream) -> (UnixStreamReadHalf, UnixStreamWriteHalf) { + let shared = Arc::new(stream); + ( + UnixStreamReadHalf(shared.clone()), + UnixStreamWriteHalf(shared), + ) +} + +pub(crate) fn split_mut( + stream: &mut UnixStream, +) -> (UnixStreamReadHalfMut<'_>, UnixStreamWriteHalfMut<'_>) { + ( + UnixStreamReadHalfMut(stream), + UnixStreamWriteHalfMut(stream), + ) +} + +impl AsRef<UnixStream> for UnixStreamReadHalf { + fn as_ref(&self) -> &UnixStream { + &self.0 + } +} + +impl AsRef<UnixStream> for UnixStreamWriteHalf { + fn as_ref(&self) -> &UnixStream { + &self.0 + } +} + +impl AsRef<UnixStream> for UnixStreamReadHalfMut<'_> { + fn as_ref(&self) -> &UnixStream { + self.0 + } +} + +impl AsRef<UnixStream> for UnixStreamWriteHalfMut<'_> { + fn as_ref(&self) -> &UnixStream { + self.0 + } +} + +impl AsyncRead for UnixStreamReadHalf { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + false + } + + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + self.0.poll_read_priv(cx, buf) + } + + fn poll_read_buf<B: BufMut>( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut B, + ) -> Poll<io::Result<usize>> { + self.0.poll_read_buf_priv(cx, buf) + } +} + +impl AsyncRead for UnixStreamReadHalfMut<'_> { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + false + } + + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + self.0.poll_read_priv(cx, buf) + } + + fn poll_read_buf<B: BufMut>( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut B, + ) -> Poll<io::Result<usize>> { + self.0.poll_read_buf_priv(cx, buf) + } +} + +impl AsyncWrite for UnixStreamWriteHalf { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + self.0.poll_write_priv(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + self.0.shutdown(Shutdown::Write).into() + } + + fn poll_write_buf<B: Buf>( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut B, + ) -> Poll<io::Result<usize>> { + self.0.poll_write_buf_priv(cx, buf) + } +} + +impl AsyncWrite for UnixStreamWriteHalfMut<'_> { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + self.0.poll_write_priv(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + self.0.shutdown(Shutdown::Write).into() + } + + fn poll_write_buf<B: Buf>( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut B, + ) -> Poll<io::Result<usize>> { + self.0.poll_write_buf_priv(cx, buf) + } +} diff --git a/tokio-uds/src/stream.rs b/tokio-uds/src/stream.rs index 9ac35a92..a19a80d8 100644 --- a/tokio-uds/src/stream.rs +++ b/tokio-uds/src/stream.rs @@ -1,3 +1,7 @@ +use crate::split::{ + split, split_mut, UnixStreamReadHalf, UnixStreamReadHalfMut, UnixStreamWriteHalf, + UnixStreamWriteHalfMut, +}; use crate::ucred::{self, UCred}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -102,6 +106,24 @@ impl UnixStream { pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { self.io.get_ref().shutdown(how) } + + /// Split a `UnixStream` into a read half and a write half, which can be used + /// to read and write the stream concurrently. + /// + /// See the module level documenation of [`split`](super::split) for more + /// details. + pub fn split(self) -> (UnixStreamReadHalf, UnixStreamWriteHalf) { + split(self) + } + + /// Split a `UnixStream` into a read half and a write half, which can be used + /// to read and write the stream concurrently. + /// + /// See the module level documenation of [`split`](super::split) for more + /// details. + pub fn split_mut(&mut self) -> (UnixStreamReadHalfMut<'_>, UnixStreamWriteHalfMut<'_>) { + split_mut(self) + } } impl TryFrom<UnixStream> for mio_uds::UnixStream { diff --git a/tokio-uds/tests/split.rs b/tokio-uds/tests/split.rs new file mode 100644 index 00000000..4afd7687 --- /dev/null +++ b/tokio-uds/tests/split.rs @@ -0,0 +1,43 @@ +#![cfg(unix)] +#![feature(async_await)] +#![deny(warnings, rust_2018_idioms)] + +use tokio::prelude::*; +use tokio_uds::UnixStream; + +/// Checks that `UnixStream` can be split into a read half and a write half using +/// `UnixStream::split` and `UnixStream::split_mut`. +/// +/// Verifies that the implementation of `AsyncWrite::poll_shutdown` shutdowns the stream for +/// writing by reading to the end of stream on the other side of the connection. +#[tokio::test] +async fn split() -> std::io::Result<()> { + let (a, mut b) = UnixStream::pair()?; + + let (mut a_read, mut a_write) = a.split(); + let (mut b_read, mut b_write) = b.split_mut(); + + let (a_response, b_response) = futures::future::try_join( + send_recv_all(&mut a_read, &mut a_write, b"A"), + send_recv_all(&mut b_read, &mut b_write, b"B"), + ) + .await?; + + assert_eq!(a_response, b"B"); + assert_eq!(b_response, b"A"); + + Ok(()) +} + +async fn send_recv_all( + read: &mut (dyn AsyncRead + Unpin), + write: &mut (dyn AsyncWrite + Unpin), + input: &[u8], +) -> std::io::Result<Vec<u8>> { + write.write_all(input).await?; + write.shutdown().await?; + + let mut output = Vec::new(); + read.read_to_end(&mut output).await?; + Ok(output) +} diff --git a/tokio/src/net.rs b/tokio/src/net.rs index ba5157e3..74ee97fa 100644 --- a/tokio/src/net.rs +++ b/tokio/src/net.rs @@ -67,7 +67,7 @@ pub use self::udp::UdpSocket; pub mod unix { //! Unix domain socket bindings for `tokio` (only available on unix systems). - pub use tokio_uds::{UCred, UnixDatagram, UnixListener, UnixStream}; + pub use tokio_uds::{split, UCred, UnixDatagram, UnixListener, UnixStream}; } #[cfg(all(unix, feature = "uds"))] pub use self::unix::{UnixDatagram, UnixListener, UnixStream}; |