diff options
author | jean-airoldie <25088801+jean-airoldie@users.noreply.github.com> | 2020-07-25 06:34:47 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-25 12:34:47 +0200 |
commit | 2d97d5ad15a19665615ed38219d3b7798df7e250 (patch) | |
tree | 208c62fc018feafccc567ef3415b844d72d97a88 | |
parent | d1744bf260384838e00311230faf7787a97f477b (diff) |
net: add try_recv/from & try_send/to to UnixDatagram (#1677)
This allows nonblocking sync send & recv operations on the socket.
-rw-r--r-- | tokio/src/net/unix/datagram/socket.rs | 77 | ||||
-rw-r--r-- | tokio/tests/uds_datagram.rs | 40 |
2 files changed, 117 insertions, 0 deletions
diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs index 2fe5654e..a332d2af 100644 --- a/tokio/src/net/unix/datagram/socket.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -85,6 +85,73 @@ impl UnixDatagram { poll_fn(|cx| self.poll_send_priv(cx, buf)).await } + /// Try to send a datagram to the peer without waiting. + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn std::error::Error>> { + /// use tokio::net::UnixDatagram; + /// + /// let bytes = b"bytes"; + /// // We use a socket pair so that they are assigned + /// // each other as a peer. + /// let (mut first, mut second) = UnixDatagram::pair()?; + /// + /// let size = first.try_send(bytes)?; + /// assert_eq!(size, bytes.len()); + /// + /// let mut buffer = vec![0u8; 24]; + /// let size = second.try_recv(&mut buffer)?; + /// + /// let dgram = &buffer.as_slice()[..size]; + /// assert_eq!(dgram, bytes); + /// # Ok(()) + /// # } + /// ``` + pub fn try_send(&mut self, buf: &[u8]) -> io::Result<usize> { + self.io.get_ref().send(buf) + } + + /// Try to send a datagram to the peer without waiting. + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn std::error::Error>> { + /// use { + /// tokio::net::UnixDatagram, + /// tempfile::tempdir, + /// }; + /// + /// let bytes = b"bytes"; + /// // We use a temporary directory so that the socket + /// // files left by the bound sockets will get cleaned up. + /// let tmp = tempdir().unwrap(); + /// + /// let server_path = tmp.path().join("server"); + /// let mut server = UnixDatagram::bind(&server_path)?; + /// + /// let client_path = tmp.path().join("client"); + /// let mut client = UnixDatagram::bind(&client_path)?; + /// + /// let size = client.try_send_to(bytes, &server_path)?; + /// assert_eq!(size, bytes.len()); + /// + /// let mut buffer = vec![0u8; 24]; + /// let (size, addr) = server.try_recv_from(&mut buffer)?; + /// + /// let dgram = &buffer.as_slice()[..size]; + /// assert_eq!(dgram, bytes); + /// assert_eq!(addr.as_pathname().unwrap(), &client_path); + /// # Ok(()) + /// # } + /// ``` + pub fn try_send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize> + where + P: AsRef<Path>, + { + self.io.get_ref().send_to(buf, target) + } + // Poll IO functions that takes `&self` are provided for the split API. // // They are not public because (taken from the doc of `PollEvented`): @@ -116,6 +183,11 @@ impl UnixDatagram { poll_fn(|cx| self.poll_recv_priv(cx, buf)).await } + /// Try to receive a datagram from the peer without waiting. + pub fn try_recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.io.get_ref().recv(buf) + } + pub(crate) fn poll_recv_priv( &self, cx: &mut Context<'_>, @@ -162,6 +234,11 @@ impl UnixDatagram { poll_fn(|cx| self.poll_recv_from_priv(cx, buf)).await } + /// Try to receive data from the socket without waiting. + pub fn try_recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.io.get_ref().recv_from(buf) + } + pub(crate) fn poll_recv_from_priv( &self, cx: &mut Context<'_>, diff --git a/tokio/tests/uds_datagram.rs b/tokio/tests/uds_datagram.rs index cfb1c649..d3c3535e 100644 --- a/tokio/tests/uds_datagram.rs +++ b/tokio/tests/uds_datagram.rs @@ -43,6 +43,46 @@ async fn echo() -> io::Result<()> { Ok(()) } +// Even though we use sync non-blocking io we still need a reactor. +#[tokio::test] +async fn try_send_recv_never_block() -> io::Result<()> { + let mut recv_buf = [0u8; 16]; + let payload = b"PAYLOAD"; + let mut count = 0; + + let (mut dgram1, mut dgram2) = UnixDatagram::pair()?; + + // Send until we hit the OS `net.unix.max_dgram_qlen`. + loop { + match dgram1.try_send(payload) { + Err(err) => match err.kind() { + io::ErrorKind::WouldBlock | io::ErrorKind::Other => break, + _ => unreachable!("unexpected error {:?}", err), + }, + Ok(len) => { + assert_eq!(len, payload.len()); + } + } + count += 1; + } + + // Read every dgram we sent. + while count > 0 { + let len = dgram2.try_recv(&mut recv_buf[..])?; + assert_eq!(len, payload.len()); + assert_eq!(payload, &recv_buf[..len]); + count -= 1; + } + + let err = dgram2.try_recv(&mut recv_buf[..]).unwrap_err(); + match err.kind() { + io::ErrorKind::WouldBlock => (), + _ => unreachable!("unexpected error {:?}", err), + } + + Ok(()) +} + #[tokio::test] async fn split() -> std::io::Result<()> { let dir = tempfile::tempdir().unwrap(); |