summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjean-airoldie <25088801+jean-airoldie@users.noreply.github.com>2020-07-25 06:34:47 -0400
committerGitHub <noreply@github.com>2020-07-25 12:34:47 +0200
commit2d97d5ad15a19665615ed38219d3b7798df7e250 (patch)
tree208c62fc018feafccc567ef3415b844d72d97a88
parentd1744bf260384838e00311230faf7787a97f477b (diff)
net: add try_recv/from & try_send/to to UnixDatagram (#1677)
This allows nonblocking sync send & recv operations on the socket.
-rw-r--r--tokio/src/net/unix/datagram/socket.rs77
-rw-r--r--tokio/tests/uds_datagram.rs40
2 files changed, 117 insertions, 0 deletions
diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs
index 2fe5654e..a332d2af 100644
--- a/tokio/src/net/unix/datagram/socket.rs
+++ b/tokio/src/net/unix/datagram/socket.rs
@@ -85,6 +85,73 @@ impl UnixDatagram {
poll_fn(|cx| self.poll_send_priv(cx, buf)).await
}
+ /// Try to send a datagram to the peer without waiting.
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
+ /// use tokio::net::UnixDatagram;
+ ///
+ /// let bytes = b"bytes";
+ /// // We use a socket pair so that they are assigned
+ /// // each other as a peer.
+ /// let (mut first, mut second) = UnixDatagram::pair()?;
+ ///
+ /// let size = first.try_send(bytes)?;
+ /// assert_eq!(size, bytes.len());
+ ///
+ /// let mut buffer = vec![0u8; 24];
+ /// let size = second.try_recv(&mut buffer)?;
+ ///
+ /// let dgram = &buffer.as_slice()[..size];
+ /// assert_eq!(dgram, bytes);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn try_send(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.io.get_ref().send(buf)
+ }
+
+ /// Try to send a datagram to the peer without waiting.
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
+ /// use {
+ /// tokio::net::UnixDatagram,
+ /// tempfile::tempdir,
+ /// };
+ ///
+ /// let bytes = b"bytes";
+ /// // We use a temporary directory so that the socket
+ /// // files left by the bound sockets will get cleaned up.
+ /// let tmp = tempdir().unwrap();
+ ///
+ /// let server_path = tmp.path().join("server");
+ /// let mut server = UnixDatagram::bind(&server_path)?;
+ ///
+ /// let client_path = tmp.path().join("client");
+ /// let mut client = UnixDatagram::bind(&client_path)?;
+ ///
+ /// let size = client.try_send_to(bytes, &server_path)?;
+ /// assert_eq!(size, bytes.len());
+ ///
+ /// let mut buffer = vec![0u8; 24];
+ /// let (size, addr) = server.try_recv_from(&mut buffer)?;
+ ///
+ /// let dgram = &buffer.as_slice()[..size];
+ /// assert_eq!(dgram, bytes);
+ /// assert_eq!(addr.as_pathname().unwrap(), &client_path);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn try_send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize>
+ where
+ P: AsRef<Path>,
+ {
+ self.io.get_ref().send_to(buf, target)
+ }
+
// Poll IO functions that takes `&self` are provided for the split API.
//
// They are not public because (taken from the doc of `PollEvented`):
@@ -116,6 +183,11 @@ impl UnixDatagram {
poll_fn(|cx| self.poll_recv_priv(cx, buf)).await
}
+ /// Try to receive a datagram from the peer without waiting.
+ pub fn try_recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.io.get_ref().recv(buf)
+ }
+
pub(crate) fn poll_recv_priv(
&self,
cx: &mut Context<'_>,
@@ -162,6 +234,11 @@ impl UnixDatagram {
poll_fn(|cx| self.poll_recv_from_priv(cx, buf)).await
}
+ /// Try to receive data from the socket without waiting.
+ pub fn try_recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+ self.io.get_ref().recv_from(buf)
+ }
+
pub(crate) fn poll_recv_from_priv(
&self,
cx: &mut Context<'_>,
diff --git a/tokio/tests/uds_datagram.rs b/tokio/tests/uds_datagram.rs
index cfb1c649..d3c3535e 100644
--- a/tokio/tests/uds_datagram.rs
+++ b/tokio/tests/uds_datagram.rs
@@ -43,6 +43,46 @@ async fn echo() -> io::Result<()> {
Ok(())
}
+// Even though we use sync non-blocking io we still need a reactor.
+#[tokio::test]
+async fn try_send_recv_never_block() -> io::Result<()> {
+ let mut recv_buf = [0u8; 16];
+ let payload = b"PAYLOAD";
+ let mut count = 0;
+
+ let (mut dgram1, mut dgram2) = UnixDatagram::pair()?;
+
+ // Send until we hit the OS `net.unix.max_dgram_qlen`.
+ loop {
+ match dgram1.try_send(payload) {
+ Err(err) => match err.kind() {
+ io::ErrorKind::WouldBlock | io::ErrorKind::Other => break,
+ _ => unreachable!("unexpected error {:?}", err),
+ },
+ Ok(len) => {
+ assert_eq!(len, payload.len());
+ }
+ }
+ count += 1;
+ }
+
+ // Read every dgram we sent.
+ while count > 0 {
+ let len = dgram2.try_recv(&mut recv_buf[..])?;
+ assert_eq!(len, payload.len());
+ assert_eq!(payload, &recv_buf[..len]);
+ count -= 1;
+ }
+
+ let err = dgram2.try_recv(&mut recv_buf[..]).unwrap_err();
+ match err.kind() {
+ io::ErrorKind::WouldBlock => (),
+ _ => unreachable!("unexpected error {:?}", err),
+ }
+
+ Ok(())
+}
+
#[tokio::test]
async fn split() -> std::io::Result<()> {
let dir = tempfile::tempdir().unwrap();