From 1e585ccb516c8dc7c13cbc3d50f8ca49260b9617 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 2 Oct 2020 13:54:00 -0700 Subject: io: update to Mio 0.7 (#2893) This also makes Mio an implementation detail, removing it from the public API. This is based on #1767. --- tokio/src/net/tcp/listener.rs | 32 +---- tokio/src/net/tcp/stream.rs | 239 +--------------------------------- tokio/src/net/udp/socket.rs | 33 ++--- tokio/src/net/unix/datagram/socket.rs | 50 ++++--- tokio/src/net/unix/listener.rs | 44 ++----- tokio/src/net/unix/mod.rs | 3 + tokio/src/net/unix/socketaddr.rs | 31 +++++ tokio/src/net/unix/stream.rs | 26 ++-- 8 files changed, 104 insertions(+), 354 deletions(-) create mode 100644 tokio/src/net/unix/socketaddr.rs (limited to 'tokio/src/net') diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs index ef0a87c0..b8f61a63 100644 --- a/tokio/src/net/tcp/listener.rs +++ b/tokio/src/net/tcp/listener.rs @@ -150,7 +150,7 @@ impl TcpListener { } fn bind_addr(addr: SocketAddr) -> io::Result { - let listener = mio::net::TcpListener::bind(&addr)?; + let listener = mio::net::TcpListener::bind(addr)?; TcpListener::new(listener) } @@ -193,23 +193,14 @@ impl TcpListener { &mut self, cx: &mut Context<'_>, ) -> Poll> { - let (io, addr) = ready!(self.poll_accept_std(cx))?; - - let io = mio::net::TcpStream::from_stream(io)?; - let io = TcpStream::new(io)?; - - Poll::Ready(Ok((io, addr))) - } - - fn poll_accept_std( - &mut self, - cx: &mut Context<'_>, - ) -> Poll> { loop { let ev = ready!(self.io.poll_read_ready(cx))?; - match self.io.get_ref().accept_std() { - Ok(pair) => return Poll::Ready(Ok(pair)), + match self.io.get_ref().accept() { + Ok((io, addr)) => { + let io = TcpStream::new(io)?; + return Poll::Ready(Ok((io, addr))); + } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { self.io.clear_readiness(ev); } @@ -265,7 +256,7 @@ impl TcpListener { /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(listener: net::TcpListener) -> io::Result { - let io = mio::net::TcpListener::from_std(listener)?; + let io = mio::net::TcpListener::from_std(listener); let io = PollEvented::new(io)?; Ok(TcpListener { io }) } @@ -408,15 +399,6 @@ impl crate::stream::Stream for TcpListener { } } -impl TryFrom for mio::net::TcpListener { - type Error = io::Error; - - /// Consumes value, returning the mio I/O object. - fn try_from(value: TcpListener) -> Result { - value.io.into_inner() - } -} - impl TryFrom for TcpListener { type Error = io::Error; diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index f35f8b0c..50536e98 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -7,10 +7,9 @@ use crate::net::{to_socket_addrs, ToSocketAddrs}; use std::convert::TryFrom; use std::fmt; use std::io::{self, Read, Write}; -use std::net::{self, Shutdown, SocketAddr}; +use std::net::{Shutdown, SocketAddr}; use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::Duration; cfg_tcp! { /// A TCP stream between a local and a remote socket. @@ -137,7 +136,7 @@ impl TcpStream { /// Establishes a connection to the specified `addr`. async fn connect_addr(addr: SocketAddr) -> io::Result { - let sys = mio::net::TcpStream::connect(&addr)?; + let sys = mio::net::TcpStream::connect(addr)?; let stream = TcpStream::new(sys)?; // Once we've connected, wait for the stream to be writable as @@ -186,40 +185,12 @@ impl TcpStream { /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. - pub fn from_std(stream: net::TcpStream) -> io::Result { - let io = mio::net::TcpStream::from_stream(stream)?; + pub fn from_std(stream: std::net::TcpStream) -> io::Result { + let io = mio::net::TcpStream::from_std(stream); let io = PollEvented::new(io)?; Ok(TcpStream { io }) } - /// Connects `TcpStream` asynchronously that may be built with a net2 `TcpBuilder`. - /// - /// This function is intended to be replaced with some sort of TcpSocket builder. - /// See https://github.com/tokio-rs/tokio/issues/2902 - /// - /// Despite being hidden, this function is part of the public API of Tokio v0.3, but - /// will be removed in v1.0 in favor of a better design. - #[doc(hidden)] - pub async fn connect_std(stream: net::TcpStream, addr: &SocketAddr) -> io::Result { - let io = mio::net::TcpStream::connect_stream(stream, addr)?; - let io = PollEvented::new(io)?; - let stream = TcpStream { io }; - - // Once we've connected, wait for the stream to be writable as - // that's when the actual connection has been initiated. Once we're - // writable we check for `take_socket_error` to see if the connect - // actually hit an error or not. - // - // If all that succeeded then we ship everything on up. - poll_fn(|cx| stream.io.poll_write_ready(cx)).await?; - - if let Some(e) = stream.io.get_ref().take_error()? { - return Err(e); - } - - Ok(stream) - } - /// Returns the local address that this stream is bound to. /// /// # Examples @@ -429,144 +400,6 @@ impl TcpStream { self.io.get_ref().set_nodelay(nodelay) } - /// Gets the value of the `SO_RCVBUF` option on this socket. - /// - /// For more information about this option, see [`set_recv_buffer_size`]. - /// - /// [`set_recv_buffer_size`]: TcpStream::set_recv_buffer_size - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::TcpStream; - /// - /// # async fn dox() -> Result<(), Box> { - /// let stream = TcpStream::connect("127.0.0.1:8080").await?; - /// - /// println!("{:?}", stream.recv_buffer_size()?); - /// # Ok(()) - /// # } - /// ``` - pub fn recv_buffer_size(&self) -> io::Result { - self.io.get_ref().recv_buffer_size() - } - - /// Sets the value of the `SO_RCVBUF` option on this socket. - /// - /// Changes the size of the operating system's receive buffer associated - /// with the socket. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::TcpStream; - /// - /// # async fn dox() -> Result<(), Box> { - /// let stream = TcpStream::connect("127.0.0.1:8080").await?; - /// - /// stream.set_recv_buffer_size(100)?; - /// # Ok(()) - /// # } - /// ``` - pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> { - self.io.get_ref().set_recv_buffer_size(size) - } - - /// Gets the value of the `SO_SNDBUF` option on this socket. - /// - /// For more information about this option, see [`set_send_buffer_size`]. - /// - /// [`set_send_buffer_size`]: TcpStream::set_send_buffer_size - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::TcpStream; - /// - /// # async fn dox() -> Result<(), Box> { - /// let stream = TcpStream::connect("127.0.0.1:8080").await?; - /// - /// println!("{:?}", stream.send_buffer_size()?); - /// # Ok(()) - /// # } - /// ``` - pub fn send_buffer_size(&self) -> io::Result { - self.io.get_ref().send_buffer_size() - } - - /// Sets the value of the `SO_SNDBUF` option on this socket. - /// - /// Changes the size of the operating system's send buffer associated with - /// the socket. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::TcpStream; - /// - /// # async fn dox() -> Result<(), Box> { - /// let stream = TcpStream::connect("127.0.0.1:8080").await?; - /// - /// stream.set_send_buffer_size(100)?; - /// # Ok(()) - /// # } - /// ``` - pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> { - self.io.get_ref().set_send_buffer_size(size) - } - - /// Returns whether keepalive messages are enabled on this socket, and if so - /// the duration of time between them. - /// - /// For more information about this option, see [`set_keepalive`]. - /// - /// [`set_keepalive`]: TcpStream::set_keepalive - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::TcpStream; - /// - /// # async fn dox() -> Result<(), Box> { - /// let stream = TcpStream::connect("127.0.0.1:8080").await?; - /// - /// println!("{:?}", stream.keepalive()?); - /// # Ok(()) - /// # } - /// ``` - pub fn keepalive(&self) -> io::Result> { - self.io.get_ref().keepalive() - } - - /// Sets whether keepalive messages are enabled to be sent on this socket. - /// - /// On Unix, this option will set the `SO_KEEPALIVE` as well as the - /// `TCP_KEEPALIVE` or `TCP_KEEPIDLE` option (depending on your platform). - /// On Windows, this will set the `SIO_KEEPALIVE_VALS` option. - /// - /// If `None` is specified then keepalive messages are disabled, otherwise - /// the duration specified will be the time to remain idle before sending a - /// TCP keepalive probe. - /// - /// Some platforms specify this value in seconds, so sub-second - /// specifications may be omitted. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::TcpStream; - /// - /// # async fn dox() -> Result<(), Box> { - /// let stream = TcpStream::connect("127.0.0.1:8080").await?; - /// - /// stream.set_keepalive(None)?; - /// # Ok(()) - /// # } - /// ``` - pub fn set_keepalive(&self, keepalive: Option) -> io::Result<()> { - self.io.get_ref().set_keepalive(keepalive) - } - /// Gets the value of the `IP_TTL` option for this socket. /// /// For more information about this option, see [`set_ttl`]. @@ -610,57 +443,6 @@ impl TcpStream { self.io.get_ref().set_ttl(ttl) } - /// Reads the linger duration for this socket by getting the `SO_LINGER` - /// option. - /// - /// For more information about this option, see [`set_linger`]. - /// - /// [`set_linger`]: TcpStream::set_linger - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::TcpStream; - /// - /// # async fn dox() -> Result<(), Box> { - /// let stream = TcpStream::connect("127.0.0.1:8080").await?; - /// - /// println!("{:?}", stream.linger()?); - /// # Ok(()) - /// # } - /// ``` - pub fn linger(&self) -> io::Result> { - self.io.get_ref().linger() - } - - /// Sets the linger duration of this socket by setting the `SO_LINGER` - /// option. - /// - /// This option controls the action taken when a stream has unsent messages - /// and the stream is closed. If `SO_LINGER` is set, the system - /// shall block the process until it can transmit the data or until the - /// time expires. - /// - /// If `SO_LINGER` is not specified, and the stream is closed, the system - /// handles the call in a way that allows the process to continue as quickly - /// as possible. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::TcpStream; - /// - /// # async fn dox() -> Result<(), Box> { - /// let stream = TcpStream::connect("127.0.0.1:8080").await?; - /// - /// stream.set_linger(None)?; - /// # Ok(()) - /// # } - /// ``` - pub fn set_linger(&self, dur: Option) -> io::Result<()> { - self.io.get_ref().set_linger(dur) - } - // These lifetime markers also appear in the generated documentation, and make // it more clear that this is a *borrowed* split. #[allow(clippy::needless_lifetimes)] @@ -749,23 +531,14 @@ impl TcpStream { } } -impl TryFrom for mio::net::TcpStream { - type Error = io::Error; - - /// Consumes value, returning the mio I/O object. - fn try_from(value: TcpStream) -> Result { - value.io.into_inner() - } -} - -impl TryFrom for TcpStream { +impl TryFrom for TcpStream { type Error = io::Error; /// Consumes stream, returning the tokio I/O object. /// /// This is equivalent to /// [`TcpStream::from_std(stream)`](TcpStream::from_std). - fn try_from(stream: net::TcpStream) -> Result { + fn try_from(stream: std::net::TcpStream) -> Result { Self::from_std(stream) } } diff --git a/tokio/src/net/udp/socket.rs b/tokio/src/net/udp/socket.rs index 1b943184..3a4c7c22 100644 --- a/tokio/src/net/udp/socket.rs +++ b/tokio/src/net/udp/socket.rs @@ -36,7 +36,7 @@ impl UdpSocket { } fn bind_addr(addr: SocketAddr) -> io::Result { - let sys = mio::net::UdpSocket::bind(&addr)?; + let sys = mio::net::UdpSocket::bind(addr)?; UdpSocket::new(sys) } @@ -63,7 +63,7 @@ impl UdpSocket { /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(socket: net::UdpSocket) -> io::Result { - let io = mio::net::UdpSocket::from_socket(socket)?; + let io = mio::net::UdpSocket::from_std(socket); UdpSocket::new(io) } @@ -103,7 +103,7 @@ impl UdpSocket { /// [`connect`]: method@Self::connect pub async fn send(&self, buf: &[u8]) -> io::Result { self.io - .async_io(mio::Ready::writable(), |sock| sock.send(buf)) + .async_io(mio::Interest::WRITABLE, |sock| sock.send(buf)) .await } @@ -135,7 +135,7 @@ impl UdpSocket { /// [`connect`]: method@Self::connect pub async fn recv(&self, buf: &mut [u8]) -> io::Result { self.io - .async_io(mio::Ready::readable(), |sock| sock.recv(buf)) + .async_io(mio::Interest::READABLE, |sock| sock.recv(buf)) .await } @@ -148,7 +148,7 @@ impl UdpSocket { let mut addrs = to_socket_addrs(target).await?; match addrs.next() { - Some(target) => self.send_to_addr(buf, &target).await, + Some(target) => self.send_to_addr(buf, target).await, None => Err(io::Error::new( io::ErrorKind::InvalidInput, "no addresses to send data to", @@ -168,12 +168,12 @@ impl UdpSocket { /// /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock pub fn try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result { - self.io.get_ref().send_to(buf, &target) + self.io.get_ref().send_to(buf, target) } - async fn send_to_addr(&self, buf: &[u8], target: &SocketAddr) -> io::Result { + async fn send_to_addr(&self, buf: &[u8], target: SocketAddr) -> io::Result { self.io - .async_io(mio::Ready::writable(), |sock| sock.send_to(buf, target)) + .async_io(mio::Interest::WRITABLE, |sock| sock.send_to(buf, target)) .await } @@ -185,7 +185,7 @@ impl UdpSocket { /// buffer, excess bytes may be discarded. pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { self.io - .async_io(mio::Ready::readable(), |sock| sock.recv_from(buf)) + .async_io(mio::Interest::READABLE, |sock| sock.recv_from(buf)) .await } @@ -324,30 +324,21 @@ impl UdpSocket { } } -impl TryFrom for mio::net::UdpSocket { - type Error = io::Error; - - /// Consumes value, returning the mio I/O object. - fn try_from(value: UdpSocket) -> Result { - value.io.into_inner() - } -} - -impl TryFrom for UdpSocket { +impl TryFrom for UdpSocket { type Error = io::Error; /// Consumes stream, returning the tokio I/O object. /// /// This is equivalent to /// [`UdpSocket::from_std(stream)`](UdpSocket::from_std). - fn try_from(stream: net::UdpSocket) -> Result { + fn try_from(stream: std::net::UdpSocket) -> Result { Self::from_std(stream) } } impl fmt::Debug for UdpSocket { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.io.fmt(f) + self.io.get_ref().fmt(f) } } diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs index 78baf279..20c6c227 100644 --- a/tokio/src/net/unix/datagram/socket.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -1,11 +1,12 @@ use crate::io::PollEvented; +use crate::net::unix::SocketAddr; use std::convert::TryFrom; use std::fmt; use std::io; use std::net::Shutdown; use std::os::unix::io::{AsRawFd, RawFd}; -use std::os::unix::net::{self, SocketAddr}; +use std::os::unix::net; use std::path::Path; cfg_uds! { @@ -77,7 +78,7 @@ cfg_uds! { /// # } /// ``` pub struct UnixDatagram { - io: PollEvented, + io: PollEvented, } } @@ -107,7 +108,7 @@ impl UnixDatagram { where P: AsRef, { - let socket = mio_uds::UnixDatagram::bind(path)?; + let socket = mio::net::UnixDatagram::bind(path)?; UnixDatagram::new(socket) } @@ -141,7 +142,7 @@ impl UnixDatagram { /// # } /// ``` pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> { - let (a, b) = mio_uds::UnixDatagram::pair()?; + let (a, b) = mio::net::UnixDatagram::pair()?; let a = UnixDatagram::new(a)?; let b = UnixDatagram::new(b)?; @@ -183,12 +184,12 @@ impl UnixDatagram { /// # } /// ``` pub fn from_std(datagram: net::UnixDatagram) -> io::Result { - let socket = mio_uds::UnixDatagram::from_datagram(datagram)?; + let socket = mio::net::UnixDatagram::from_std(datagram); let io = PollEvented::new(socket)?; Ok(UnixDatagram { io }) } - fn new(socket: mio_uds::UnixDatagram) -> io::Result { + fn new(socket: mio::net::UnixDatagram) -> io::Result { let io = PollEvented::new(socket)?; Ok(UnixDatagram { io }) } @@ -225,7 +226,7 @@ impl UnixDatagram { /// # } /// ``` pub fn unbound() -> io::Result { - let socket = mio_uds::UnixDatagram::unbound()?; + let socket = mio::net::UnixDatagram::unbound()?; UnixDatagram::new(socket) } @@ -298,7 +299,7 @@ impl UnixDatagram { /// ``` pub async fn send(&self, buf: &[u8]) -> io::Result { self.io - .async_io(mio::Ready::writable(), |sock| sock.send(buf)) + .async_io(mio::Interest::WRITABLE, |sock| sock.send(buf)) .await } @@ -397,7 +398,7 @@ impl UnixDatagram { /// ``` pub async fn recv(&self, buf: &mut [u8]) -> io::Result { self.io - .async_io(mio::Ready::readable(), |sock| sock.recv(buf)) + .async_io(mio::Interest::READABLE, |sock| sock.recv(buf)) .await } @@ -467,7 +468,7 @@ impl UnixDatagram { P: AsRef, { self.io - .async_io(mio::Ready::writable(), |sock| { + .async_io(mio::Interest::WRITABLE, |sock| { sock.send_to(buf, target.as_ref()) }) .await @@ -507,9 +508,12 @@ impl UnixDatagram { /// # } /// ``` pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - self.io - .async_io(mio::Ready::readable(), |sock| sock.recv_from(buf)) - .await + let (n, addr) = self + .io + .async_io(mio::Interest::READABLE, |sock| sock.recv_from(buf)) + .await?; + + Ok((n, SocketAddr(addr))) } /// Try to receive data from the socket without waiting. @@ -545,7 +549,8 @@ impl UnixDatagram { /// # } /// ``` pub fn try_recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - self.io.get_ref().recv_from(buf) + let (n, addr) = self.io.get_ref().recv_from(buf)?; + Ok((n, SocketAddr(addr))) } /// Returns the local address that this socket is bound to. @@ -589,7 +594,7 @@ impl UnixDatagram { /// # } /// ``` pub fn local_addr(&self) -> io::Result { - self.io.get_ref().local_addr() + self.io.get_ref().local_addr().map(SocketAddr) } /// Returns the address of this socket's peer. @@ -638,7 +643,7 @@ impl UnixDatagram { /// # } /// ``` pub fn peer_addr(&self) -> io::Result { - self.io.get_ref().peer_addr() + self.io.get_ref().peer_addr().map(SocketAddr) } /// Returns the value of the `SO_ERROR` option. @@ -701,23 +706,14 @@ impl UnixDatagram { } } -impl TryFrom for mio_uds::UnixDatagram { - type Error = io::Error; - - /// Consumes value, returning the mio I/O object. - fn try_from(value: UnixDatagram) -> Result { - value.io.into_inner() - } -} - -impl TryFrom for UnixDatagram { +impl TryFrom for UnixDatagram { type Error = io::Error; /// Consumes stream, returning the Tokio I/O object. /// /// This is equivalent to /// [`UnixDatagram::from_std(stream)`](UnixDatagram::from_std). - fn try_from(stream: net::UnixDatagram) -> Result { + fn try_from(stream: std::os::unix::net::UnixDatagram) -> Result { Self::from_std(stream) } } diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs index dc8cb08e..5d586ec3 100644 --- a/tokio/src/net/unix/listener.rs +++ b/tokio/src/net/unix/listener.rs @@ -1,12 +1,12 @@ use crate::future::poll_fn; use crate::io::PollEvented; -use crate::net::unix::{Incoming, UnixStream}; +use crate::net::unix::{Incoming, SocketAddr, UnixStream}; use std::convert::TryFrom; use std::fmt; use std::io; use std::os::unix::io::{AsRawFd, RawFd}; -use std::os::unix::net::{self, SocketAddr}; +use std::os::unix::net; use std::path::Path; use std::task::{Context, Poll}; @@ -46,7 +46,7 @@ cfg_uds! { /// } /// ``` pub struct UnixListener { - io: PollEvented, + io: PollEvented, } } @@ -64,7 +64,7 @@ impl UnixListener { where P: AsRef, { - let listener = mio_uds::UnixListener::bind(path)?; + let listener = mio::net::UnixListener::bind(path)?; let io = PollEvented::new(listener)?; Ok(UnixListener { io }) } @@ -83,14 +83,14 @@ impl UnixListener { /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(listener: net::UnixListener) -> io::Result { - let listener = mio_uds::UnixListener::from_listener(listener)?; + let listener = mio::net::UnixListener::from_std(listener); let io = PollEvented::new(listener)?; Ok(UnixListener { io }) } /// Returns the local socket address of this listener. pub fn local_addr(&self) -> io::Result { - self.io.get_ref().local_addr() + self.io.get_ref().local_addr().map(SocketAddr) } /// Returns the value of the `SO_ERROR` option. @@ -111,24 +111,15 @@ impl UnixListener { &mut self, cx: &mut Context<'_>, ) -> Poll> { - let (io, addr) = ready!(self.poll_accept_std(cx))?; - - let io = mio_uds::UnixStream::from_stream(io)?; - Ok((UnixStream::new(io)?, addr)).into() - } - - fn poll_accept_std( - &mut self, - cx: &mut Context<'_>, - ) -> Poll> { loop { let ev = ready!(self.io.poll_read_ready(cx))?; - match self.io.get_ref().accept_std() { - Ok(None) => { - self.io.clear_readiness(ev); + match self.io.get_ref().accept() { + Ok((sock, addr)) => { + let addr = SocketAddr(addr); + let sock = UnixStream::new(sock)?; + return Poll::Ready(Ok((sock, addr))); } - Ok(Some((sock, addr))) => return Ok((sock, addr)).into(), Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { self.io.clear_readiness(ev); } @@ -192,23 +183,14 @@ impl crate::stream::Stream for UnixListener { } } -impl TryFrom for mio_uds::UnixListener { - type Error = io::Error; - - /// Consumes value, returning the mio I/O object. - fn try_from(value: UnixListener) -> Result { - value.io.into_inner() - } -} - -impl TryFrom for UnixListener { +impl TryFrom for UnixListener { type Error = io::Error; /// Consumes stream, returning the tokio I/O object. /// /// This is equivalent to /// [`UnixListener::from_std(stream)`](UnixListener::from_std). - fn try_from(stream: net::UnixListener) -> io::Result { + fn try_from(stream: std::os::unix::net::UnixListener) -> io::Result { Self::from_std(stream) } } diff --git a/tokio/src/net/unix/mod.rs b/tokio/src/net/unix/mod.rs index b079fe04..21aa4fe7 100644 --- a/tokio/src/net/unix/mod.rs +++ b/tokio/src/net/unix/mod.rs @@ -14,6 +14,9 @@ pub use split::{ReadHalf, WriteHalf}; mod split_owned; pub use split_owned::{OwnedReadHalf, OwnedWriteHalf, ReuniteError}; +mod socketaddr; +pub use socketaddr::SocketAddr; + pub(crate) mod stream; pub(crate) use stream::UnixStream; diff --git a/tokio/src/net/unix/socketaddr.rs b/tokio/src/net/unix/socketaddr.rs new file mode 100644 index 00000000..48f7b96b --- /dev/null +++ b/tokio/src/net/unix/socketaddr.rs @@ -0,0 +1,31 @@ +use std::fmt; +use std::path::Path; + +/// An address associated with a Tokio Unix socket. +pub struct SocketAddr(pub(super) mio::net::SocketAddr); + +impl SocketAddr { + /// Returns `true` if the address is unnamed. + /// + /// Documentation reflected in [`SocketAddr`] + /// + /// [`SocketAddr`]: std::os::unix::net::SocketAddr + pub fn is_unnamed(&self) -> bool { + self.0.is_unnamed() + } + + /// Returns the contents of this address if it is a `pathname` address. + /// + /// Documentation reflected in [`SocketAddr`] + /// + /// [`SocketAddr`]: std::os::unix::net::SocketAddr + pub fn as_pathname(&self) -> Option<&Path> { + self.0.as_pathname() + } +} + +impl fmt::Debug for SocketAddr { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(fmt) + } +} diff --git a/tokio/src/net/unix/stream.rs b/tokio/src/net/unix/stream.rs index 715ed7aa..d0f98f43 100644 --- a/tokio/src/net/unix/stream.rs +++ b/tokio/src/net/unix/stream.rs @@ -3,13 +3,14 @@ use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf}; use crate::net::unix::split::{split, ReadHalf, WriteHalf}; use crate::net::unix::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf}; use crate::net::unix::ucred::{self, UCred}; +use crate::net::unix::SocketAddr; use std::convert::TryFrom; use std::fmt; use std::io::{self, Read, Write}; use std::net::Shutdown; use std::os::unix::io::{AsRawFd, RawFd}; -use std::os::unix::net::{self, SocketAddr}; +use std::os::unix::net; use std::path::Path; use std::pin::Pin; use std::task::{Context, Poll}; @@ -21,7 +22,7 @@ cfg_uds! { /// from a listener with `UnixListener::incoming`. Additionally, a pair of /// anonymous Unix sockets can be created with `UnixStream::pair`. pub struct UnixStream { - io: PollEvented, + io: PollEvented, } } @@ -35,7 +36,7 @@ impl UnixStream { where P: AsRef, { - let stream = mio_uds::UnixStream::connect(path)?; + let stream = mio::net::UnixStream::connect(path)?; let stream = UnixStream::new(stream)?; poll_fn(|cx| stream.io.poll_write_ready(cx)).await?; @@ -56,7 +57,7 @@ impl UnixStream { /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(stream: net::UnixStream) -> io::Result { - let stream = mio_uds::UnixStream::from_stream(stream)?; + let stream = mio::net::UnixStream::from_std(stream); let io = PollEvented::new(stream)?; Ok(UnixStream { io }) @@ -68,26 +69,26 @@ impl UnixStream { /// communicating back and forth between one another. Each socket will /// be associated with the default event loop's handle. pub fn pair() -> io::Result<(UnixStream, UnixStream)> { - let (a, b) = mio_uds::UnixStream::pair()?; + let (a, b) = mio::net::UnixStream::pair()?; let a = UnixStream::new(a)?; let b = UnixStream::new(b)?; Ok((a, b)) } - pub(crate) fn new(stream: mio_uds::UnixStream) -> io::Result { + pub(crate) fn new(stream: mio::net::UnixStream) -> io::Result { let io = PollEvented::new(stream)?; Ok(UnixStream { io }) } /// Returns the socket address of the local half of this connection. pub fn local_addr(&self) -> io::Result { - self.io.get_ref().local_addr() + self.io.get_ref().local_addr().map(SocketAddr) } /// Returns the socket address of the remote half of this connection. pub fn peer_addr(&self) -> io::Result { - self.io.get_ref().peer_addr() + self.io.get_ref().peer_addr().map(SocketAddr) } /// Returns effective credentials of the process which called `connect` or `pair`. @@ -139,15 +140,6 @@ impl UnixStream { } } -impl TryFrom for mio_uds::UnixStream { - type Error = io::Error; - - /// Consumes value, returning the mio I/O object. - fn try_from(value: UnixStream) -> Result { - value.io.into_inner() - } -} - impl TryFrom for UnixStream { type Error = io::Error; -- cgit v1.2.3