use crate::future::poll_fn; use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf}; use crate::net::unix::split::{split, ReadHalf, WriteHalf}; use crate::net::unix::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf}; use crate::net::unix::ucred::{self, UCred}; use crate::net::unix::SocketAddr; use std::convert::TryFrom; use std::fmt; use std::io; use std::net::Shutdown; use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::net; use std::path::Path; use std::pin::Pin; use std::task::{Context, Poll}; cfg_net_unix! { /// A structure representing a connected Unix socket. /// /// This socket can be connected directly with `UnixStream::connect` or accepted /// from a listener with `UnixListener::incoming`. Additionally, a pair of /// anonymous Unix sockets can be created with `UnixStream::pair`. pub struct UnixStream { io: PollEvented, } } impl UnixStream { /// Connects to the socket named by `path`. /// /// This function will create a new Unix socket and connect to the path /// specified, associating the returned stream with the default event loop's /// handle. pub async fn connect

(path: P) -> io::Result where P: AsRef, { let stream = mio::net::UnixStream::connect(path)?; let stream = UnixStream::new(stream)?; poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?; Ok(stream) } /// Creates new `UnixStream` from a `std::os::unix::net::UnixStream`. /// /// This function is intended to be used to wrap a UnixStream from the /// standard library in the Tokio equivalent. The conversion assumes /// nothing about the underlying stream; it is left up to the user to set /// it in non-blocking mode. /// /// # Panics /// /// This function panics if thread-local runtime is not set. /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(stream: net::UnixStream) -> io::Result { let stream = mio::net::UnixStream::from_std(stream); let io = PollEvented::new(stream)?; Ok(UnixStream { io }) } /// Creates an unnamed pair of connected sockets. /// /// This function will create a pair of interconnected Unix sockets for /// communicating back and forth between one another. Each socket will /// be associated with the default event loop's handle. pub fn pair() -> io::Result<(UnixStream, UnixStream)> { let (a, b) = mio::net::UnixStream::pair()?; let a = UnixStream::new(a)?; let b = UnixStream::new(b)?; Ok((a, b)) } pub(crate) fn new(stream: mio::net::UnixStream) -> io::Result { let io = PollEvented::new(stream)?; Ok(UnixStream { io }) } /// Returns the socket address of the local half of this connection. pub fn local_addr(&self) -> io::Result { self.io.local_addr().map(SocketAddr) } /// Returns the socket address of the remote half of this connection. pub fn peer_addr(&self) -> io::Result { self.io.peer_addr().map(SocketAddr) } /// Returns effective credentials of the process which called `connect` or `pair`. pub fn peer_cred(&self) -> io::Result { ucred::get_peer_cred(self) } /// Returns the value of the `SO_ERROR` option. pub fn take_error(&self) -> io::Result> { self.io.take_error() } /// Shuts down the read, write, or both halves of this connection. /// /// This function will cause all pending and future I/O calls on the /// specified portions to immediately return with an appropriate value /// (see the documentation of `Shutdown`). pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { self.io.shutdown(how) } // These lifetime markers also appear in the generated documentation, and make // it more clear that this is a *borrowed* split. #[allow(clippy::needless_lifetimes)] /// Split a `UnixStream` into a read half and a write half, which can be used /// to read and write the stream concurrently. /// /// This method is more efficient than [`into_split`], but the halves cannot be /// moved into independently spawned tasks. /// /// [`into_split`]: Self::into_split() pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) { split(self) } /// Splits a `UnixStream` into a read half and a write half, which can be used /// to read and write the stream concurrently. /// /// Unlike [`split`], the owned halves can be moved to separate tasks, however /// this comes at the cost of a heap allocation. /// /// **Note:** Dropping the write half will shut down the write half of the /// stream. This is equivalent to calling [`shutdown(Write)`] on the `UnixStream`. /// /// [`split`]: Self::split() /// [`shutdown(Write)`]: fn@Self::shutdown pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) { split_owned(self) } } impl TryFrom for UnixStream { type Error = io::Error; /// Consumes stream, returning the tokio I/O object. /// /// This is equivalent to /// [`UnixStream::from_std(stream)`](UnixStream::from_std). fn try_from(stream: net::UnixStream) -> io::Result { Self::from_std(stream) } } impl AsyncRead for UnixStream { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { self.poll_read_priv(cx, buf) } } impl AsyncWrite for UnixStream { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { self.poll_write_priv(cx, buf) } fn poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll> { self.poll_write_vectored_priv(cx, bufs) } fn is_write_vectored(&self) -> bool { true } fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { self.shutdown(std::net::Shutdown::Write)?; Poll::Ready(Ok(())) } } impl UnixStream { // == Poll IO functions that takes `&self` == // // They are not public because (taken from the doc of `PollEvented`): // // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the // caller must ensure that there are at most two tasks that use a // `PollEvented` instance concurrently. One for reading and one for writing. // While violating this requirement is "safe" from a Rust memory model point // of view, it will result in unexpected behavior in the form of lost // notifications and tasks hanging. pub(crate) fn poll_read_priv( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { // Safety: `UnixStream::read` correctly handles reads into uninitialized memory unsafe { self.io.poll_read(cx, buf) } } pub(crate) fn poll_write_priv( &self, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { self.io.poll_write(cx, buf) } pub(super) fn poll_write_vectored_priv( &self, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll> { self.io.poll_write_vectored(cx, bufs) } } impl fmt::Debug for UnixStream { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.io.fmt(f) } } impl AsRawFd for UnixStream { fn as_raw_fd(&self) -> RawFd { self.io.as_raw_fd() } }