summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorcssivision <cssivision@gmail.com>2020-12-10 15:36:43 +0800
committerGitHub <noreply@github.com>2020-12-10 08:36:43 +0100
commit2a30e13f38b864807f9ad92023e91b060a6227a4 (patch)
tree2ff07d9ed9f82c562e03ae7f13dd05150ffe899f
parent52cd240053b2e1dd5835186539f563c3496dfd7d (diff)
downloadtokio-2a30e13f38b864807f9ad92023e91b060a6227a4.tar.gz
tokio-2a30e13f38b864807f9ad92023e91b060a6227a4.tar.xz
net: expose poll_* methods on UnixDatagram (#3223)
-rw-r--r--tokio/src/net/unix/datagram/socket.rs561
-rw-r--r--tokio/tests/uds_datagram.rs96
2 files changed, 563 insertions, 94 deletions
diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs
index f9e9321b..fb5f6029 100644
--- a/tokio/src/net/unix/datagram/socket.rs
+++ b/tokio/src/net/unix/datagram/socket.rs
@@ -1,4 +1,4 @@
-use crate::io::{Interest, PollEvented};
+use crate::io::{Interest, PollEvented, ReadBuf, Ready};
use crate::net::unix::SocketAddr;
use std::convert::TryFrom;
@@ -8,6 +8,7 @@ use std::net::Shutdown;
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net;
use std::path::Path;
+use std::task::{Context, Poll};
cfg_net_unix! {
/// An I/O object representing a Unix datagram socket.
@@ -83,6 +84,178 @@ cfg_net_unix! {
}
impl UnixDatagram {
+ /// Wait for any of the requested ready states.
+ ///
+ /// This function is usually paired with `try_recv()` or `try_send()`. It
+ /// can be used to concurrently recv / send to the same socket on a single
+ /// task without splitting the socket.
+ ///
+ /// The function may complete without the socket being ready. This is a
+ /// false-positive and attempting an operation will return with
+ /// `io::ErrorKind::WouldBlock`.
+ ///
+ /// # Examples
+ ///
+ /// Concurrently receive from and send to the socket on the same task
+ /// without splitting.
+ ///
+ /// ```no_run
+ /// use tokio::io::Interest;
+ /// use tokio::net::UnixDatagram;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let client_path = dir.path().join("client.sock");
+ /// let server_path = dir.path().join("server.sock");
+ /// let socket = UnixDatagram::bind(&client_path)?;
+ /// socket.connect(&server_path)?;
+ ///
+ /// loop {
+ /// let ready = socket.ready(Interest::READABLE | Interest::WRITABLE).await?;
+ ///
+ /// if ready.is_readable() {
+ /// let mut data = [0; 1024];
+ /// match socket.try_recv(&mut data[..]) {
+ /// Ok(n) => {
+ /// println!("received {:?}", &data[..n]);
+ /// }
+ /// // False-positive, continue
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// if ready.is_writable() {
+ /// // Write some data
+ /// match socket.try_send(b"hello world") {
+ /// Ok(n) => {
+ /// println!("sent {} bytes", n);
+ /// }
+ /// // False-positive, continue
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ /// }
+ /// }
+ /// ```
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ let event = self.io.registration().readiness(interest).await?;
+ Ok(event.ready)
+ }
+
+ /// Wait for the socket to become writable.
+ ///
+ /// This function is equivalent to `ready(Interest::WRITABLE)` and is
+ /// usually paired with `try_send()` or `try_send_to()`.
+ ///
+ /// The function may complete without the socket being writable. This is a
+ /// false-positive and attempting a `try_send()` will return with
+ /// `io::ErrorKind::WouldBlock`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixDatagram;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let client_path = dir.path().join("client.sock");
+ /// let server_path = dir.path().join("server.sock");
+ /// let socket = UnixDatagram::bind(&client_path)?;
+ /// socket.connect(&server_path)?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be writable
+ /// socket.writable().await?;
+ ///
+ /// // Try to send data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_send(b"hello world") {
+ /// Ok(n) => {
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn writable(&self) -> io::Result<()> {
+ self.ready(Interest::WRITABLE).await?;
+ Ok(())
+ }
+
+ /// Wait for the socket to become readable.
+ ///
+ /// This function is equivalent to `ready(Interest::READABLE)` and is usually
+ /// paired with `try_recv()`.
+ ///
+ /// The function may complete without the socket being readable. This is a
+ /// false-positive and attempting a `try_recv()` will return with
+ /// `io::ErrorKind::WouldBlock`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixDatagram;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Connect to a peer
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let client_path = dir.path().join("client.sock");
+ /// let server_path = dir.path().join("server.sock");
+ /// let socket = UnixDatagram::bind(&client_path)?;
+ /// socket.connect(&server_path)?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// socket.readable().await?;
+ ///
+ /// // The buffer is **not** included in the async task and will
+ /// // only exist on the stack.
+ /// let mut buf = [0; 1024];
+ ///
+ /// // Try to recv data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_recv(&mut buf) {
+ /// Ok(n) => {
+ /// println!("GOT {:?}", &buf[..n]);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn readable(&self) -> io::Result<()> {
+ self.ready(Interest::READABLE).await?;
+ Ok(())
+ }
+
/// Creates a new `UnixDatagram` bound to the specified path.
///
/// # Examples
@@ -309,68 +482,91 @@ impl UnixDatagram {
/// Try to send a datagram to the peer without waiting.
///
/// # Examples
- /// ```
- /// # #[tokio::main]
- /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
- /// use tokio::net::UnixDatagram;
///
- /// let bytes = b"bytes";
- /// // We use a socket pair so that they are assigned
- /// // each other as a peer.
- /// let (first, second) = UnixDatagram::pair()?;
- ///
- /// let size = first.try_send(bytes)?;
- /// assert_eq!(size, bytes.len());
- ///
- /// let mut buffer = vec![0u8; 24];
- /// let size = second.try_recv(&mut buffer)?;
- ///
- /// let dgram = &buffer[..size];
- /// assert_eq!(dgram, bytes);
- /// # Ok(())
- /// # }
+ /// ```no_run
+ /// use tokio::net::UnixDatagram;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let client_path = dir.path().join("client.sock");
+ /// let server_path = dir.path().join("server.sock");
+ /// let socket = UnixDatagram::bind(&client_path)?;
+ /// socket.connect(&server_path)?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be writable
+ /// socket.writable().await?;
+ ///
+ /// // Try to send data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_send(b"hello world") {
+ /// Ok(n) => {
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
/// ```
pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
- self.io.send(buf)
+ self.io
+ .registration()
+ .try_io(Interest::WRITABLE, || self.io.send(buf))
}
/// Try to send a datagram to the peer without waiting.
///
/// # Examples
- /// ```
- /// # #[tokio::main]
- /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
- /// use tokio::net::UnixDatagram;
- /// use tempfile::tempdir;
- ///
- /// let bytes = b"bytes";
- /// // We use a temporary directory so that the socket
- /// // files left by the bound sockets will get cleaned up.
- /// let tmp = tempdir().unwrap();
///
- /// let server_path = tmp.path().join("server");
- /// let server = UnixDatagram::bind(&server_path)?;
- ///
- /// let client_path = tmp.path().join("client");
- /// let client = UnixDatagram::bind(&client_path)?;
- ///
- /// let size = client.try_send_to(bytes, &server_path)?;
- /// assert_eq!(size, bytes.len());
- ///
- /// let mut buffer = vec![0u8; 24];
- /// let (size, addr) = server.try_recv_from(&mut buffer)?;
- ///
- /// let dgram = &buffer[..size];
- /// assert_eq!(dgram, bytes);
- /// assert_eq!(addr.as_pathname().unwrap(), &client_path);
- /// # Ok(())
- /// # }
+ /// ```no_run
+ /// use tokio::net::UnixDatagram;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let client_path = dir.path().join("client.sock");
+ /// let server_path = dir.path().join("server.sock");
+ /// let socket = UnixDatagram::bind(&client_path)?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be writable
+ /// socket.writable().await?;
+ ///
+ /// // Try to send data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_send_to(b"hello world", &server_path) {
+ /// Ok(n) => {
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
/// ```
pub fn try_send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
where
P: AsRef<Path>,
{
- self.io.send_to(buf, target)
+ self.io
+ .registration()
+ .try_io(Interest::WRITABLE, || self.io.send_to(buf, target))
}
/// Receives data from the socket.
@@ -409,29 +605,51 @@ impl UnixDatagram {
/// Try to receive a datagram from the peer without waiting.
///
/// # Examples
- /// ```
- /// # #[tokio::main]
- /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
- /// use tokio::net::UnixDatagram;
- ///
- /// let bytes = b"bytes";
- /// // We use a socket pair so that they are assigned
- /// // each other as a peer.
- /// let (first, second) = UnixDatagram::pair()?;
- ///
- /// let size = first.try_send(bytes)?;
- /// assert_eq!(size, bytes.len());
- ///
- /// let mut buffer = vec![0u8; 24];
- /// let size = second.try_recv(&mut buffer)?;
///
- /// let dgram = &buffer[..size];
- /// assert_eq!(dgram, bytes);
- /// # Ok(())
- /// # }
+ /// ```no_run
+ /// use tokio::net::UnixDatagram;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Connect to a peer
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let client_path = dir.path().join("client.sock");
+ /// let server_path = dir.path().join("server.sock");
+ /// let socket = UnixDatagram::bind(&client_path)?;
+ /// socket.connect(&server_path)?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// socket.readable().await?;
+ ///
+ /// // The buffer is **not** included in the async task and will
+ /// // only exist on the stack.
+ /// let mut buf = [0; 1024];
+ ///
+ /// // Try to recv data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_recv(&mut buf) {
+ /// Ok(n) => {
+ /// println!("GOT {:?}", &buf[..n]);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
/// ```
pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
- self.io.recv(buf)
+ self.io
+ .registration()
+ .try_io(Interest::READABLE, || self.io.recv(buf))
}
/// Sends data on the socket to the specified address.
@@ -520,40 +738,195 @@ impl UnixDatagram {
Ok((n, SocketAddr(addr)))
}
- /// Try to receive data from the socket without waiting.
+ /// Attempts to receive a single datagram on the specified address.
///
- /// # Examples
- /// ```
- /// # #[tokio::main]
- /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
- /// use tokio::net::UnixDatagram;
- /// use tempfile::tempdir;
+ /// Note that on multiple calls to a `poll_*` method in the recv direction, only the
+ /// `Waker` from the `Context` passed to the most recent call will be scheduled to
+ /// receive a wakeup.
///
- /// let bytes = b"bytes";
- /// // We use a temporary directory so that the socket
- /// // files left by the bound sockets will get cleaned up.
- /// let tmp = tempdir().unwrap();
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the socket is not ready to read
+ /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
///
- /// let server_path = tmp.path().join("server");
- /// let server = UnixDatagram::bind(&server_path)?;
+ /// # Errors
///
- /// let client_path = tmp.path().join("client");
- /// let client = UnixDatagram::bind(&client_path)?;
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ pub fn poll_recv_from(
+ &self,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<SocketAddr>> {
+ let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || {
+ // Safety: will not read the maybe uinitialized bytes.
+ let b = unsafe {
+ &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
+ };
+
+ self.io.recv_from(b)
+ }))?;
+
+ // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
+ unsafe {
+ buf.assume_init(n);
+ }
+ buf.advance(n);
+ Poll::Ready(Ok(SocketAddr(addr)))
+ }
+
+ /// Attempts to send data to the specified address.
///
- /// let size = client.try_send_to(bytes, &server_path)?;
- /// assert_eq!(size, bytes.len());
+ /// Note that on multiple calls to a `poll_*` method in the send direction, only the
+ /// `Waker` from the `Context` passed to the most recent call will be scheduled to
+ /// receive a wakeup.
///
- /// let mut buffer = vec![0u8; 24];
- /// let (size, addr) = server.try_recv_from(&mut buffer)?;
+ /// # Return value
///
- /// let dgram = &buffer[..size];
- /// assert_eq!(dgram, bytes);
- /// assert_eq!(addr.as_pathname().unwrap(), &client_path);
- /// # Ok(())
- /// # }
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the socket is not ready to write
+ /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent.
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ pub fn poll_send_to<P>(
+ &self,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ target: P,
+ ) -> Poll<io::Result<usize>>
+ where
+ P: AsRef<Path>,
+ {
+ self.io
+ .registration()
+ .poll_write_io(cx, || self.io.send_to(buf, target.as_ref()))
+ }
+
+ /// Attempts to send data on the socket to the remote address to which it
+ /// was previously `connect`ed.
+ ///
+ /// The [`connect`] method will connect this socket to a remote address.
+ /// This method will fail if the socket is not connected.
+ ///
+ /// Note that on multiple calls to a `poll_*` method in the send direction,
+ /// only the `Waker` from the `Context` passed to the most recent call will
+ /// be scheduled to receive a wakeup.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the socket is not available to write
+ /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ ///
+ /// [`connect`]: method@Self::connect
+ pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
+ self.io
+ .registration()
+ .poll_write_io(cx, || self.io.send(buf))
+ }
+
+ /// Attempts to receive a single datagram message on the socket from the remote
+ /// address to which it is `connect`ed.
+ ///
+ /// The [`connect`] method will connect this socket to a remote address. This method
+ /// resolves to an error if the socket is not connected.
+ ///
+ /// Note that on multiple calls to a `poll_*` method in the recv direction, only the
+ /// `Waker` from the `Context` passed to the most recent call will be scheduled to
+ /// receive a wakeup.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the socket is not ready to read
+ /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ ///
+ /// [`connect`]: method@Self::connect
+ pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
+ let n = ready!(self.io.registration().poll_read_io(cx, || {
+ // Safety: will not read the maybe uinitialized bytes.
+ let b = unsafe {
+ &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
+ };
+
+ self.io.recv(b)
+ }))?;
+
+ // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
+ unsafe {
+ buf.assume_init(n);
+ }
+ buf.advance(n);
+ Poll::Ready(Ok(()))
+ }
+
+ /// Try to receive data from the socket without waiting.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixDatagram;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Connect to a peer
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let client_path = dir.path().join("client.sock");
+ /// let server_path = dir.path().join("server.sock");
+ /// let socket = UnixDatagram::bind(&client_path)?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// socket.readable().await?;
+ ///
+ /// // The buffer is **not** included in the async task and will
+ /// // only exist on the stack.
+ /// let mut buf = [0; 1024];
+ ///
+ /// // Try to recv data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_recv_from(&mut buf) {
+ /// Ok((n, _addr)) => {
+ /// println!("GOT {:?}", &buf[..n]);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
/// ```
pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
- let (n, addr) = self.io.recv_from(buf)?;
+ let (n, addr) = self
+ .io
+ .registration()
+ .try_io(Interest::READABLE, || self.io.recv_from(buf))?;
+
Ok((n, SocketAddr(addr)))
}
diff --git a/tokio/tests/uds_datagram.rs b/tokio/tests/uds_datagram.rs
index ec2f6f82..cdabd7b1 100644
--- a/tokio/tests/uds_datagram.rs
+++ b/tokio/tests/uds_datagram.rs
@@ -2,6 +2,8 @@
#![cfg(feature = "full")]
#![cfg(unix)]
+use futures::future::poll_fn;
+use tokio::io::ReadBuf;
use tokio::net::UnixDatagram;
use tokio::try_join;
@@ -82,6 +84,8 @@ async fn try_send_recv_never_block() -> io::Result<()> {
// Send until we hit the OS `net.unix.max_dgram_qlen`.
loop {
+ dgram1.writable().await.unwrap();
+
match dgram1.try_send(payload) {
Err(err) => match err.kind() {
io::ErrorKind::WouldBlock | io::ErrorKind::Other => break,
@@ -96,6 +100,7 @@ async fn try_send_recv_never_block() -> io::Result<()> {
// Read every dgram we sent.
while count > 0 {
+ dgram2.readable().await.unwrap();
let len = dgram2.try_recv(&mut recv_buf[..])?;
assert_eq!(len, payload.len());
assert_eq!(payload, &recv_buf[..len]);
@@ -134,3 +139,94 @@ async fn split() -> std::io::Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn send_to_recv_from_poll() -> std::io::Result<()> {
+ let dir = tempfile::tempdir().unwrap();
+ let sender_path = dir.path().join("sender.sock");
+ let receiver_path = dir.path().join("receiver.sock");
+
+ let sender = UnixDatagram::bind(&sender_path)?;
+ let receiver = UnixDatagram::bind(&receiver_path)?;
+
+ let msg = b"hello";
+ poll_fn(|cx| sender.poll_send_to(cx, msg, &receiver_path)).await?;
+
+ let mut recv_buf = [0u8; 32];
+ let mut read = ReadBuf::new(&mut recv_buf);
+ let addr = poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
+
+ assert_eq!(read.filled(), msg);
+ assert_eq!(addr.as_pathname(), Some(sender_path.as_ref()));
+ Ok(())
+}
+
+#[tokio::test]
+async fn send_recv_poll() -> std::io::Result<()> {
+ let dir = tempfile::tempdir().unwrap();
+ let sender_path = dir.path().join("sender.sock");
+ let receiver_path = dir.path().join("receiver.sock");
+
+ let sender = UnixDatagram::bind(&sender_path)?;
+ let receiver = UnixDatagram::bind(&receiver_path)?;
+
+ sender.connect(&receiver_path)?;
+ receiver.connect(&sender_path)?;
+
+ let msg = b"hello";
+ poll_fn(|cx| sender.poll_send(cx, msg)).await?;
+
+ let mut recv_buf = [0u8; 32];
+ let mut read = ReadBuf::new(&mut recv_buf);
+ let _len = poll_fn(|cx| receiver.poll_recv(cx, &mut read)).await?;
+
+ assert_eq!(read.filled(), msg);
+ Ok(())
+}
+
+#[tokio::test]
+async fn try_send_to_recv_from() -> std::io::Result<()> {
+ let dir = tempfile::tempdir().unwrap();
+ let server_path = dir.path().join("server.sock");
+ let client_path = dir.path().join("client.sock");
+
+ // Create listener
+ let server = UnixDatagram::bind(&server_path)?;
+
+ // Create socket pair
+ let client = UnixDatagram::bind(&client_path)?;
+
+ for _ in 0..5 {
+ loop {
+ client.writable().await?;
+
+ match client.try_send_to(b"hello world", &server_path) {
+ Ok(n) => {
+ assert_eq!(n, 11);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+
+ loop {
+ server.readable().await?;
+
+ let mut buf = [0; 512];
+
+ match server.try_recv_from(&mut buf) {
+ Ok((n, addr)) => {
+ assert_eq!(n, 11);
+ assert_eq!(addr.as_pathname(), Some(client_path.as_ref()));
+ assert_eq!(&buf[0..11], &b"hello world"[..]);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+ }
+
+ Ok(())
+}