summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTomasz Miąsko <tomaszmiasko@gmail.com>2019-08-09 21:50:18 +0200
committerCarl Lerche <me@carllerche.com>2019-08-09 12:50:18 -0700
commit756606a58be2ad29c33752873d2c5c3c2f7a1d71 (patch)
treed22556b688cd1f0df3efe4b59d0bbab8864c635b
parente3b4c99a33d86f9825721bc3bc7f298adef094d3 (diff)
uds: implement split and split_mut for UnixStream (#1395)
This mirrors split API available in TcpStream.
-rw-r--r--tokio-uds/src/lib.rs1
-rw-r--r--tokio-uds/src/split.rs180
-rw-r--r--tokio-uds/src/stream.rs22
-rw-r--r--tokio-uds/tests/split.rs43
-rw-r--r--tokio/src/net.rs2
5 files changed, 247 insertions, 1 deletions
diff --git a/tokio-uds/src/lib.rs b/tokio-uds/src/lib.rs
index 2f8dc7cf..a2824241 100644
--- a/tokio-uds/src/lib.rs
+++ b/tokio-uds/src/lib.rs
@@ -12,6 +12,7 @@ mod datagram;
// mod frame;
mod incoming;
mod listener;
+pub mod split;
mod stream;
mod ucred;
diff --git a/tokio-uds/src/split.rs b/tokio-uds/src/split.rs
new file mode 100644
index 00000000..19f2458d
--- /dev/null
+++ b/tokio-uds/src/split.rs
@@ -0,0 +1,180 @@
+//! `UnixStream` split support.
+//!
+//! A `UnixStream` can be split into a read half and a write half with `UnixStream::split`
+//! and `UnixStream::split_mut` methods. The read half implements `AsyncRead` while
+//! the write half implements `AsyncWrite`. The two halves can be used concurrently.
+//!
+//! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized
+//! split gives read and write halves that are faster and smaller, because they
+//! do not use locks. They also provide access to the underlying `UnixStream`
+//! after split, implementing `AsRef<UnixStream>`. This allows you to call
+//! `UnixStream` methods that takes `&self`, e.g., to get local and peer
+//! addresses, to get and set socket options, and to shutdown the sockets.
+
+use super::UnixStream;
+use bytes::{Buf, BufMut};
+use std::io;
+use std::net::Shutdown;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use tokio_io::{AsyncRead, AsyncWrite};
+
+/// Read half of a `UnixStream`.
+#[derive(Debug)]
+pub struct UnixStreamReadHalf(Arc<UnixStream>);
+
+/// Write half of a `UnixStream`.
+///
+/// Note that in the `AsyncWrite` implementation of `UnixStreamWriteHalf`,
+/// `poll_shutdown` actually shuts down the stream in the write direction.
+#[derive(Debug)]
+pub struct UnixStreamWriteHalf(Arc<UnixStream>);
+
+/// Read half of a `UnixStream`.
+#[derive(Debug)]
+pub struct UnixStreamReadHalfMut<'a>(&'a UnixStream);
+
+/// Write half of a `UnixStream`.
+///
+/// Note that in the `AsyncWrite` implementation of `UnixStreamWriteHalfMut`,
+/// `poll_shutdown` actually shuts down the stream in the write direction.
+#[derive(Debug)]
+pub struct UnixStreamWriteHalfMut<'a>(&'a UnixStream);
+
+pub(crate) fn split(stream: UnixStream) -> (UnixStreamReadHalf, UnixStreamWriteHalf) {
+ let shared = Arc::new(stream);
+ (
+ UnixStreamReadHalf(shared.clone()),
+ UnixStreamWriteHalf(shared),
+ )
+}
+
+pub(crate) fn split_mut(
+ stream: &mut UnixStream,
+) -> (UnixStreamReadHalfMut<'_>, UnixStreamWriteHalfMut<'_>) {
+ (
+ UnixStreamReadHalfMut(stream),
+ UnixStreamWriteHalfMut(stream),
+ )
+}
+
+impl AsRef<UnixStream> for UnixStreamReadHalf {
+ fn as_ref(&self) -> &UnixStream {
+ &self.0
+ }
+}
+
+impl AsRef<UnixStream> for UnixStreamWriteHalf {
+ fn as_ref(&self) -> &UnixStream {
+ &self.0
+ }
+}
+
+impl AsRef<UnixStream> for UnixStreamReadHalfMut<'_> {
+ fn as_ref(&self) -> &UnixStream {
+ self.0
+ }
+}
+
+impl AsRef<UnixStream> for UnixStreamWriteHalfMut<'_> {
+ fn as_ref(&self) -> &UnixStream {
+ self.0
+ }
+}
+
+impl AsyncRead for UnixStreamReadHalf {
+ unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
+ false
+ }
+
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ self.0.poll_read_priv(cx, buf)
+ }
+
+ fn poll_read_buf<B: BufMut>(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut B,
+ ) -> Poll<io::Result<usize>> {
+ self.0.poll_read_buf_priv(cx, buf)
+ }
+}
+
+impl AsyncRead for UnixStreamReadHalfMut<'_> {
+ unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
+ false
+ }
+
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ self.0.poll_read_priv(cx, buf)
+ }
+
+ fn poll_read_buf<B: BufMut>(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut B,
+ ) -> Poll<io::Result<usize>> {
+ self.0.poll_read_buf_priv(cx, buf)
+ }
+}
+
+impl AsyncWrite for UnixStreamWriteHalf {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.0.poll_write_priv(cx, buf)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.0.shutdown(Shutdown::Write).into()
+ }
+
+ fn poll_write_buf<B: Buf>(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut B,
+ ) -> Poll<io::Result<usize>> {
+ self.0.poll_write_buf_priv(cx, buf)
+ }
+}
+
+impl AsyncWrite for UnixStreamWriteHalfMut<'_> {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.0.poll_write_priv(cx, buf)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.0.shutdown(Shutdown::Write).into()
+ }
+
+ fn poll_write_buf<B: Buf>(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut B,
+ ) -> Poll<io::Result<usize>> {
+ self.0.poll_write_buf_priv(cx, buf)
+ }
+}
diff --git a/tokio-uds/src/stream.rs b/tokio-uds/src/stream.rs
index 9ac35a92..a19a80d8 100644
--- a/tokio-uds/src/stream.rs
+++ b/tokio-uds/src/stream.rs
@@ -1,3 +1,7 @@
+use crate::split::{
+ split, split_mut, UnixStreamReadHalf, UnixStreamReadHalfMut, UnixStreamWriteHalf,
+ UnixStreamWriteHalfMut,
+};
use crate::ucred::{self, UCred};
use tokio_io::{AsyncRead, AsyncWrite};
@@ -102,6 +106,24 @@ impl UnixStream {
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
self.io.get_ref().shutdown(how)
}
+
+ /// Split a `UnixStream` into a read half and a write half, which can be used
+ /// to read and write the stream concurrently.
+ ///
+ /// See the module level documenation of [`split`](super::split) for more
+ /// details.
+ pub fn split(self) -> (UnixStreamReadHalf, UnixStreamWriteHalf) {
+ split(self)
+ }
+
+ /// Split a `UnixStream` into a read half and a write half, which can be used
+ /// to read and write the stream concurrently.
+ ///
+ /// See the module level documenation of [`split`](super::split) for more
+ /// details.
+ pub fn split_mut(&mut self) -> (UnixStreamReadHalfMut<'_>, UnixStreamWriteHalfMut<'_>) {
+ split_mut(self)
+ }
}
impl TryFrom<UnixStream> for mio_uds::UnixStream {
diff --git a/tokio-uds/tests/split.rs b/tokio-uds/tests/split.rs
new file mode 100644
index 00000000..4afd7687
--- /dev/null
+++ b/tokio-uds/tests/split.rs
@@ -0,0 +1,43 @@
+#![cfg(unix)]
+#![feature(async_await)]
+#![deny(warnings, rust_2018_idioms)]
+
+use tokio::prelude::*;
+use tokio_uds::UnixStream;
+
+/// Checks that `UnixStream` can be split into a read half and a write half using
+/// `UnixStream::split` and `UnixStream::split_mut`.
+///
+/// Verifies that the implementation of `AsyncWrite::poll_shutdown` shutdowns the stream for
+/// writing by reading to the end of stream on the other side of the connection.
+#[tokio::test]
+async fn split() -> std::io::Result<()> {
+ let (a, mut b) = UnixStream::pair()?;
+
+ let (mut a_read, mut a_write) = a.split();
+ let (mut b_read, mut b_write) = b.split_mut();
+
+ let (a_response, b_response) = futures::future::try_join(
+ send_recv_all(&mut a_read, &mut a_write, b"A"),
+ send_recv_all(&mut b_read, &mut b_write, b"B"),
+ )
+ .await?;
+
+ assert_eq!(a_response, b"B");
+ assert_eq!(b_response, b"A");
+
+ Ok(())
+}
+
+async fn send_recv_all(
+ read: &mut (dyn AsyncRead + Unpin),
+ write: &mut (dyn AsyncWrite + Unpin),
+ input: &[u8],
+) -> std::io::Result<Vec<u8>> {
+ write.write_all(input).await?;
+ write.shutdown().await?;
+
+ let mut output = Vec::new();
+ read.read_to_end(&mut output).await?;
+ Ok(output)
+}
diff --git a/tokio/src/net.rs b/tokio/src/net.rs
index ba5157e3..74ee97fa 100644
--- a/tokio/src/net.rs
+++ b/tokio/src/net.rs
@@ -67,7 +67,7 @@ pub use self::udp::UdpSocket;
pub mod unix {
//! Unix domain socket bindings for `tokio` (only available on unix systems).
- pub use tokio_uds::{UCred, UnixDatagram, UnixListener, UnixStream};
+ pub use tokio_uds::{split, UCred, UnixDatagram, UnixListener, UnixStream};
}
#[cfg(all(unix, feature = "uds"))]
pub use self::unix::{UnixDatagram, UnixListener, UnixStream};