From 2a30e13f38b864807f9ad92023e91b060a6227a4 Mon Sep 17 00:00:00 2001 From: cssivision Date: Thu, 10 Dec 2020 15:36:43 +0800 Subject: net: expose poll_* methods on UnixDatagram (#3223) --- tokio/src/net/unix/datagram/socket.rs | 561 ++++++++++++++++++++++++++++------ tokio/tests/uds_datagram.rs | 96 ++++++ 2 files changed, 563 insertions(+), 94 deletions(-) diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs index f9e9321b..fb5f6029 100644 --- a/tokio/src/net/unix/datagram/socket.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -1,4 +1,4 @@ -use crate::io::{Interest, PollEvented}; +use crate::io::{Interest, PollEvented, ReadBuf, Ready}; use crate::net::unix::SocketAddr; use std::convert::TryFrom; @@ -8,6 +8,7 @@ use std::net::Shutdown; use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::net; use std::path::Path; +use std::task::{Context, Poll}; cfg_net_unix! { /// An I/O object representing a Unix datagram socket. @@ -83,6 +84,178 @@ cfg_net_unix! { } impl UnixDatagram { + /// Wait for any of the requested ready states. + /// + /// This function is usually paired with `try_recv()` or `try_send()`. It + /// can be used to concurrently recv / send to the same socket on a single + /// task without splitting the socket. + /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. + /// + /// # Examples + /// + /// Concurrently receive from and send to the socket on the same task + /// without splitting. + /// + /// ```no_run + /// use tokio::io::Interest; + /// use tokio::net::UnixDatagram; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let dir = tempfile::tempdir().unwrap(); + /// let client_path = dir.path().join("client.sock"); + /// let server_path = dir.path().join("server.sock"); + /// let socket = UnixDatagram::bind(&client_path)?; + /// socket.connect(&server_path)?; + /// + /// loop { + /// let ready = socket.ready(Interest::READABLE | Interest::WRITABLE).await?; + /// + /// if ready.is_readable() { + /// let mut data = [0; 1024]; + /// match socket.try_recv(&mut data[..]) { + /// Ok(n) => { + /// println!("received {:?}", &data[..n]); + /// } + /// // False-positive, continue + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// if ready.is_writable() { + /// // Write some data + /// match socket.try_send(b"hello world") { + /// Ok(n) => { + /// println!("sent {} bytes", n); + /// } + /// // False-positive, continue + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// } + /// } + /// ``` + pub async fn ready(&self, interest: Interest) -> io::Result { + let event = self.io.registration().readiness(interest).await?; + Ok(event.ready) + } + + /// Wait for the socket to become writable. + /// + /// This function is equivalent to `ready(Interest::WRITABLE)` and is + /// usually paired with `try_send()` or `try_send_to()`. + /// + /// The function may complete without the socket being writable. This is a + /// false-positive and attempting a `try_send()` will return with + /// `io::ErrorKind::WouldBlock`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UnixDatagram; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let dir = tempfile::tempdir().unwrap(); + /// let client_path = dir.path().join("client.sock"); + /// let server_path = dir.path().join("server.sock"); + /// let socket = UnixDatagram::bind(&client_path)?; + /// socket.connect(&server_path)?; + /// + /// loop { + /// // Wait for the socket to be writable + /// socket.writable().await?; + /// + /// // Try to send data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match socket.try_send(b"hello world") { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub async fn writable(&self) -> io::Result<()> { + self.ready(Interest::WRITABLE).await?; + Ok(()) + } + + /// Wait for the socket to become readable. + /// + /// This function is equivalent to `ready(Interest::READABLE)` and is usually + /// paired with `try_recv()`. + /// + /// The function may complete without the socket being readable. This is a + /// false-positive and attempting a `try_recv()` will return with + /// `io::ErrorKind::WouldBlock`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UnixDatagram; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Connect to a peer + /// let dir = tempfile::tempdir().unwrap(); + /// let client_path = dir.path().join("client.sock"); + /// let server_path = dir.path().join("server.sock"); + /// let socket = UnixDatagram::bind(&client_path)?; + /// socket.connect(&server_path)?; + /// + /// loop { + /// // Wait for the socket to be readable + /// socket.readable().await?; + /// + /// // The buffer is **not** included in the async task and will + /// // only exist on the stack. + /// let mut buf = [0; 1024]; + /// + /// // Try to recv data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match socket.try_recv(&mut buf) { + /// Ok(n) => { + /// println!("GOT {:?}", &buf[..n]); + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub async fn readable(&self) -> io::Result<()> { + self.ready(Interest::READABLE).await?; + Ok(()) + } + /// Creates a new `UnixDatagram` bound to the specified path. /// /// # Examples @@ -309,68 +482,91 @@ impl UnixDatagram { /// Try to send a datagram to the peer without waiting. /// /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() -> Result<(), Box> { - /// use tokio::net::UnixDatagram; /// - /// let bytes = b"bytes"; - /// // We use a socket pair so that they are assigned - /// // each other as a peer. - /// let (first, second) = UnixDatagram::pair()?; - /// - /// let size = first.try_send(bytes)?; - /// assert_eq!(size, bytes.len()); - /// - /// let mut buffer = vec![0u8; 24]; - /// let size = second.try_recv(&mut buffer)?; - /// - /// let dgram = &buffer[..size]; - /// assert_eq!(dgram, bytes); - /// # Ok(()) - /// # } + /// ```no_run + /// use tokio::net::UnixDatagram; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let dir = tempfile::tempdir().unwrap(); + /// let client_path = dir.path().join("client.sock"); + /// let server_path = dir.path().join("server.sock"); + /// let socket = UnixDatagram::bind(&client_path)?; + /// socket.connect(&server_path)?; + /// + /// loop { + /// // Wait for the socket to be writable + /// socket.writable().await?; + /// + /// // Try to send data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match socket.try_send(b"hello world") { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// Ok(()) + /// } /// ``` pub fn try_send(&self, buf: &[u8]) -> io::Result { - self.io.send(buf) + self.io + .registration() + .try_io(Interest::WRITABLE, || self.io.send(buf)) } /// Try to send a datagram to the peer without waiting. /// /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() -> Result<(), Box> { - /// use tokio::net::UnixDatagram; - /// use tempfile::tempdir; - /// - /// let bytes = b"bytes"; - /// // We use a temporary directory so that the socket - /// // files left by the bound sockets will get cleaned up. - /// let tmp = tempdir().unwrap(); /// - /// let server_path = tmp.path().join("server"); - /// let server = UnixDatagram::bind(&server_path)?; - /// - /// let client_path = tmp.path().join("client"); - /// let client = UnixDatagram::bind(&client_path)?; - /// - /// let size = client.try_send_to(bytes, &server_path)?; - /// assert_eq!(size, bytes.len()); - /// - /// let mut buffer = vec![0u8; 24]; - /// let (size, addr) = server.try_recv_from(&mut buffer)?; - /// - /// let dgram = &buffer[..size]; - /// assert_eq!(dgram, bytes); - /// assert_eq!(addr.as_pathname().unwrap(), &client_path); - /// # Ok(()) - /// # } + /// ```no_run + /// use tokio::net::UnixDatagram; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let dir = tempfile::tempdir().unwrap(); + /// let client_path = dir.path().join("client.sock"); + /// let server_path = dir.path().join("server.sock"); + /// let socket = UnixDatagram::bind(&client_path)?; + /// + /// loop { + /// // Wait for the socket to be writable + /// socket.writable().await?; + /// + /// // Try to send data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match socket.try_send_to(b"hello world", &server_path) { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// Ok(()) + /// } /// ``` pub fn try_send_to

(&self, buf: &[u8], target: P) -> io::Result where P: AsRef, { - self.io.send_to(buf, target) + self.io + .registration() + .try_io(Interest::WRITABLE, || self.io.send_to(buf, target)) } /// Receives data from the socket. @@ -409,29 +605,51 @@ impl UnixDatagram { /// Try to receive a datagram from the peer without waiting. /// /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() -> Result<(), Box> { - /// use tokio::net::UnixDatagram; - /// - /// let bytes = b"bytes"; - /// // We use a socket pair so that they are assigned - /// // each other as a peer. - /// let (first, second) = UnixDatagram::pair()?; - /// - /// let size = first.try_send(bytes)?; - /// assert_eq!(size, bytes.len()); - /// - /// let mut buffer = vec![0u8; 24]; - /// let size = second.try_recv(&mut buffer)?; /// - /// let dgram = &buffer[..size]; - /// assert_eq!(dgram, bytes); - /// # Ok(()) - /// # } + /// ```no_run + /// use tokio::net::UnixDatagram; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Connect to a peer + /// let dir = tempfile::tempdir().unwrap(); + /// let client_path = dir.path().join("client.sock"); + /// let server_path = dir.path().join("server.sock"); + /// let socket = UnixDatagram::bind(&client_path)?; + /// socket.connect(&server_path)?; + /// + /// loop { + /// // Wait for the socket to be readable + /// socket.readable().await?; + /// + /// // The buffer is **not** included in the async task and will + /// // only exist on the stack. + /// let mut buf = [0; 1024]; + /// + /// // Try to recv data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match socket.try_recv(&mut buf) { + /// Ok(n) => { + /// println!("GOT {:?}", &buf[..n]); + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// Ok(()) + /// } /// ``` pub fn try_recv(&self, buf: &mut [u8]) -> io::Result { - self.io.recv(buf) + self.io + .registration() + .try_io(Interest::READABLE, || self.io.recv(buf)) } /// Sends data on the socket to the specified address. @@ -520,40 +738,195 @@ impl UnixDatagram { Ok((n, SocketAddr(addr))) } - /// Try to receive data from the socket without waiting. + /// Attempts to receive a single datagram on the specified address. /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() -> Result<(), Box> { - /// use tokio::net::UnixDatagram; - /// use tempfile::tempdir; + /// Note that on multiple calls to a `poll_*` method in the recv direction, only the + /// `Waker` from the `Context` passed to the most recent call will be scheduled to + /// receive a wakeup. /// - /// let bytes = b"bytes"; - /// // We use a temporary directory so that the socket - /// // files left by the bound sockets will get cleaned up. - /// let tmp = tempdir().unwrap(); + /// # Return value + /// + /// The function returns: + /// + /// * `Poll::Pending` if the socket is not ready to read + /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready + /// * `Poll::Ready(Err(e))` if an error is encountered. /// - /// let server_path = tmp.path().join("server"); - /// let server = UnixDatagram::bind(&server_path)?; + /// # Errors /// - /// let client_path = tmp.path().join("client"); - /// let client = UnixDatagram::bind(&client_path)?; + /// This function may encounter any standard I/O error except `WouldBlock`. + pub fn poll_recv_from( + &self, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || { + // Safety: will not read the maybe uinitialized bytes. + let b = unsafe { + &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]) + }; + + self.io.recv_from(b) + }))?; + + // Safety: We trust `recv` to have filled up `n` bytes in the buffer. + unsafe { + buf.assume_init(n); + } + buf.advance(n); + Poll::Ready(Ok(SocketAddr(addr))) + } + + /// Attempts to send data to the specified address. /// - /// let size = client.try_send_to(bytes, &server_path)?; - /// assert_eq!(size, bytes.len()); + /// Note that on multiple calls to a `poll_*` method in the send direction, only the + /// `Waker` from the `Context` passed to the most recent call will be scheduled to + /// receive a wakeup. /// - /// let mut buffer = vec![0u8; 24]; - /// let (size, addr) = server.try_recv_from(&mut buffer)?; + /// # Return value /// - /// let dgram = &buffer[..size]; - /// assert_eq!(dgram, bytes); - /// assert_eq!(addr.as_pathname().unwrap(), &client_path); - /// # Ok(()) - /// # } + /// The function returns: + /// + /// * `Poll::Pending` if the socket is not ready to write + /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent. + /// * `Poll::Ready(Err(e))` if an error is encountered. + /// + /// # Errors + /// + /// This function may encounter any standard I/O error except `WouldBlock`. + pub fn poll_send_to

( + &self, + cx: &mut Context<'_>, + buf: &[u8], + target: P, + ) -> Poll> + where + P: AsRef, + { + self.io + .registration() + .poll_write_io(cx, || self.io.send_to(buf, target.as_ref())) + } + + /// Attempts to send data on the socket to the remote address to which it + /// was previously `connect`ed. + /// + /// The [`connect`] method will connect this socket to a remote address. + /// This method will fail if the socket is not connected. + /// + /// Note that on multiple calls to a `poll_*` method in the send direction, + /// only the `Waker` from the `Context` passed to the most recent call will + /// be scheduled to receive a wakeup. + /// + /// # Return value + /// + /// The function returns: + /// + /// * `Poll::Pending` if the socket is not available to write + /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent + /// * `Poll::Ready(Err(e))` if an error is encountered. + /// + /// # Errors + /// + /// This function may encounter any standard I/O error except `WouldBlock`. + /// + /// [`connect`]: method@Self::connect + pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + self.io + .registration() + .poll_write_io(cx, || self.io.send(buf)) + } + + /// Attempts to receive a single datagram message on the socket from the remote + /// address to which it is `connect`ed. + /// + /// The [`connect`] method will connect this socket to a remote address. This method + /// resolves to an error if the socket is not connected. + /// + /// Note that on multiple calls to a `poll_*` method in the recv direction, only the + /// `Waker` from the `Context` passed to the most recent call will be scheduled to + /// receive a wakeup. + /// + /// # Return value + /// + /// The function returns: + /// + /// * `Poll::Pending` if the socket is not ready to read + /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready + /// * `Poll::Ready(Err(e))` if an error is encountered. + /// + /// # Errors + /// + /// This function may encounter any standard I/O error except `WouldBlock`. + /// + /// [`connect`]: method@Self::connect + pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + let n = ready!(self.io.registration().poll_read_io(cx, || { + // Safety: will not read the maybe uinitialized bytes. + let b = unsafe { + &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]) + }; + + self.io.recv(b) + }))?; + + // Safety: We trust `recv` to have filled up `n` bytes in the buffer. + unsafe { + buf.assume_init(n); + } + buf.advance(n); + Poll::Ready(Ok(())) + } + + /// Try to receive data from the socket without waiting. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UnixDatagram; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Connect to a peer + /// let dir = tempfile::tempdir().unwrap(); + /// let client_path = dir.path().join("client.sock"); + /// let server_path = dir.path().join("server.sock"); + /// let socket = UnixDatagram::bind(&client_path)?; + /// + /// loop { + /// // Wait for the socket to be readable + /// socket.readable().await?; + /// + /// // The buffer is **not** included in the async task and will + /// // only exist on the stack. + /// let mut buf = [0; 1024]; + /// + /// // Try to recv data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match socket.try_recv_from(&mut buf) { + /// Ok((n, _addr)) => { + /// println!("GOT {:?}", &buf[..n]); + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// Ok(()) + /// } /// ``` pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - let (n, addr) = self.io.recv_from(buf)?; + let (n, addr) = self + .io + .registration() + .try_io(Interest::READABLE, || self.io.recv_from(buf))?; + Ok((n, SocketAddr(addr))) } diff --git a/tokio/tests/uds_datagram.rs b/tokio/tests/uds_datagram.rs index ec2f6f82..cdabd7b1 100644 --- a/tokio/tests/uds_datagram.rs +++ b/tokio/tests/uds_datagram.rs @@ -2,6 +2,8 @@ #![cfg(feature = "full")] #![cfg(unix)] +use futures::future::poll_fn; +use tokio::io::ReadBuf; use tokio::net::UnixDatagram; use tokio::try_join; @@ -82,6 +84,8 @@ async fn try_send_recv_never_block() -> io::Result<()> { // Send until we hit the OS `net.unix.max_dgram_qlen`. loop { + dgram1.writable().await.unwrap(); + match dgram1.try_send(payload) { Err(err) => match err.kind() { io::ErrorKind::WouldBlock | io::ErrorKind::Other => break, @@ -96,6 +100,7 @@ async fn try_send_recv_never_block() -> io::Result<()> { // Read every dgram we sent. while count > 0 { + dgram2.readable().await.unwrap(); let len = dgram2.try_recv(&mut recv_buf[..])?; assert_eq!(len, payload.len()); assert_eq!(payload, &recv_buf[..len]); @@ -134,3 +139,94 @@ async fn split() -> std::io::Result<()> { Ok(()) } + +#[tokio::test] +async fn send_to_recv_from_poll() -> std::io::Result<()> { + let dir = tempfile::tempdir().unwrap(); + let sender_path = dir.path().join("sender.sock"); + let receiver_path = dir.path().join("receiver.sock"); + + let sender = UnixDatagram::bind(&sender_path)?; + let receiver = UnixDatagram::bind(&receiver_path)?; + + let msg = b"hello"; + poll_fn(|cx| sender.poll_send_to(cx, msg, &receiver_path)).await?; + + let mut recv_buf = [0u8; 32]; + let mut read = ReadBuf::new(&mut recv_buf); + let addr = poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?; + + assert_eq!(read.filled(), msg); + assert_eq!(addr.as_pathname(), Some(sender_path.as_ref())); + Ok(()) +} + +#[tokio::test] +async fn send_recv_poll() -> std::io::Result<()> { + let dir = tempfile::tempdir().unwrap(); + let sender_path = dir.path().join("sender.sock"); + let receiver_path = dir.path().join("receiver.sock"); + + let sender = UnixDatagram::bind(&sender_path)?; + let receiver = UnixDatagram::bind(&receiver_path)?; + + sender.connect(&receiver_path)?; + receiver.connect(&sender_path)?; + + let msg = b"hello"; + poll_fn(|cx| sender.poll_send(cx, msg)).await?; + + let mut recv_buf = [0u8; 32]; + let mut read = ReadBuf::new(&mut recv_buf); + let _len = poll_fn(|cx| receiver.poll_recv(cx, &mut read)).await?; + + assert_eq!(read.filled(), msg); + Ok(()) +} + +#[tokio::test] +async fn try_send_to_recv_from() -> std::io::Result<()> { + let dir = tempfile::tempdir().unwrap(); + let server_path = dir.path().join("server.sock"); + let client_path = dir.path().join("client.sock"); + + // Create listener + let server = UnixDatagram::bind(&server_path)?; + + // Create socket pair + let client = UnixDatagram::bind(&client_path)?; + + for _ in 0..5 { + loop { + client.writable().await?; + + match client.try_send_to(b"hello world", &server_path) { + Ok(n) => { + assert_eq!(n, 11); + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{:?}", e), + } + } + + loop { + server.readable().await?; + + let mut buf = [0; 512]; + + match server.try_recv_from(&mut buf) { + Ok((n, addr)) => { + assert_eq!(n, 11); + assert_eq!(addr.as_pathname(), Some(client_path.as_ref())); + assert_eq!(&buf[0..11], &b"hello world"[..]); + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{:?}", e), + } + } + } + + Ok(()) +} -- cgit v1.2.3