From 0ea23076503c5151d68a781a3d91823396c82949 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 16 Nov 2020 15:44:01 -0800 Subject: net: add UdpSocket readiness and non-blocking ops (#3138) Adds `ready()`, `readable()`, and `writable()` async methods for waiting for socket readiness. Adds `try_send`, `try_send_to`, `try_recv`, and `try_recv_from` for performing non-blocking operations on the socket. This is the UDP equivalent of #3130. --- tokio/src/net/tcp/stream.rs | 5 +- tokio/src/net/udp/socket.rs | 564 +++++++++++++++++++++++++++++++++++++------- tokio/tests/udp.rs | 90 +++++++ 3 files changed, 571 insertions(+), 88 deletions(-) diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 8a157e1c..7427ba54 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -385,7 +385,7 @@ impl TcpStream { /// If data is successfully read, `Ok(n)` is returned, where `n` is the /// number of bytes read. `Ok(n)` indicates the stream's read half is closed /// and will no longer yield data. If the stream is not ready to read data - /// `Err(io::ErrorKinid::WouldBlock)` is returned. + /// `Err(io::ErrorKind::WouldBlock)` is returned. /// /// # Examples /// @@ -495,8 +495,7 @@ impl TcpStream { /// The function will attempt to write the entire contents of `buf`, but /// only part of the buffer may be written. /// - /// This function is equivalent to `ready(Interest::WRITABLE)` is usually - /// paired with `try_write()`. + /// This function is usually paired with `writable()`. /// /// # Return /// diff --git a/tokio/src/net/udp/socket.rs b/tokio/src/net/udp/socket.rs index f8b6a787..a9c5c868 100644 --- a/tokio/src/net/udp/socket.rs +++ b/tokio/src/net/udp/socket.rs @@ -1,4 +1,4 @@ -use crate::io::{Interest, PollEvented, ReadBuf}; +use crate::io::{Interest, PollEvented, ReadBuf, Ready}; use crate::net::{to_socket_addrs, ToSocketAddrs}; use std::convert::TryFrom; @@ -262,13 +262,149 @@ impl UdpSocket { })) } - /// Returns a future that sends data on the socket to the remote address to which it is connected. - /// On success, the future will resolve to the number of bytes written. + /// Wait for any of the requested ready states. /// - /// The [`connect`] method will connect this socket to a remote address. The future - /// will resolve to an error if the socket is not connected. + /// This function is usually paired with `try_recv()` or `try_send()`. It + /// can be used to concurrently recv / send to the same socket on a single + /// task without splitting the socket. + /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. + /// + /// # Examples + /// + /// Concurrently receive from and send to the socket on the same task + /// without splitting. + /// + /// ```no_run + /// use tokio::io::{self, Interest}; + /// use tokio::net::UdpSocket; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; + /// socket.connect("127.0.0.1:8081").await?; + /// + /// loop { + /// let ready = socket.ready(Interest::READABLE | Interest::WRITABLE).await?; + /// + /// if ready.is_readable() { + /// // The buffer is **not** included in the async task and will only exist + /// // on the stack. + /// let mut data = [0; 1024]; + /// match socket.try_recv(&mut data[..]) { + /// Ok(n) => { + /// println!("received {:?}", &data[..n]); + /// } + /// // False-positive, continue + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// if ready.is_writable() { + /// // Write some data + /// match socket.try_send(b"hello world") { + /// Ok(n) => { + /// println!("sent {} bytes", n); + /// } + /// // False-positive, continue + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// } + /// } + /// ``` + pub async fn ready(&self, interest: Interest) -> io::Result { + let event = self.io.registration().readiness(interest).await?; + Ok(event.ready) + } + + /// Wait for the socket to become writable. + /// + /// This function is equivalent to `ready(Interest::WRITABLE)` and is + /// usually paired with `try_send()` or `try_send_to()`. + /// + /// The function may complete without the socket being writable. This is a + /// false-positive and attempting a `try_send()` will return with + /// `io::ErrorKind::WouldBlock`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UdpSocket; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Bind socket + /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; + /// socket.connect("127.0.0.1:8081").await?; + /// + /// loop { + /// // Wait for the socket to be writable + /// socket.writable().await?; + /// + /// // Try to send data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match socket.try_send(b"hello world") { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub async fn writable(&self) -> io::Result<()> { + self.ready(Interest::WRITABLE).await?; + Ok(()) + } + + /// Sends data on the socket to the remote address that the socket is + /// connected to. + /// + /// The [`connect`] method will connect this socket to a remote address. + /// This method will fail if the socket is not connected. /// /// [`connect`]: method@Self::connect + /// + /// # Return + /// + /// On success, the number of bytes sent is returned, otherwise, the + /// encountered error is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::io; + /// use tokio::net::UdpSocket; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Bind socket + /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; + /// socket.connect("127.0.0.1:8081").await?; + /// + /// // Send a message + /// socket.send(b"hello world").await?; + /// + /// Ok(()) + /// } + /// ``` pub async fn send(&self, buf: &[u8]) -> io::Result { self.io .registration() @@ -276,15 +412,15 @@ impl UdpSocket { .await } - /// Attempts to send data on the socket to the remote address to which it was previously - /// `connect`ed. + /// Attempts to send data on the socket to the remote address to which it + /// was previously `connect`ed. /// - /// The [`connect`] method will connect this socket to a remote address. The future - /// will resolve to an error if the socket is not connected. + /// The [`connect`] method will connect this socket to a remote address. + /// This method will fail if the socket is not connected. /// - /// Note that on multiple calls to a `poll_*` method in the send direction, only the - /// `Waker` from the `Context` passed to the most recent call will be scheduled to - /// receive a wakeup. + /// Note that on multiple calls to a `poll_*` method in the send direction, + /// only the `Waker` from the `Context` passed to the most recent call will + /// be scheduled to receive a wakeup. /// /// # Return value /// @@ -308,29 +444,140 @@ impl UdpSocket { /// Try to send data on the socket to the remote address to which it is /// connected. /// + /// When the socket buffer is full, `Err(io::ErrorKind::WouldBlock)` is + /// returned. This function is usually paired with `writable()`. + /// /// # Returns /// - /// If successfull, the number of bytes sent is returned. Users - /// should ensure that when the remote cannot receive, the - /// [`ErrorKind::WouldBlock`] is properly handled. + /// If successful, `Ok(n)` is returned, where `n` is the number of bytes + /// sent. If the socket is not ready to send data, + /// `Err(ErrorKind::WouldBlock)` is returned. /// - /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock + /// # Examples + /// + /// ```no_run + /// use tokio::net::UdpSocket; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Bind a UDP socket + /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; + /// + /// // Connect to a peer + /// socket.connect("127.0.0.1:8081").await?; + /// + /// loop { + /// // Wait for the socket to be writable + /// socket.writable().await?; + /// + /// // Try to send data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match socket.try_send(b"hello world") { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` pub fn try_send(&self, buf: &[u8]) -> io::Result { - self.io.send(buf) + self.io + .registration() + .try_io(Interest::WRITABLE, || self.io.send(buf)) + } + + /// Wait for the socket to become readable. + /// + /// This function is equivalent to `ready(Interest::READABLE)` and is usually + /// paired with `try_recv()`. + /// + /// The function may complete without the socket being readable. This is a + /// false-positive and attempting a `try_recv()` will return with + /// `io::ErrorKind::WouldBlock`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UdpSocket; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Connect to a peer + /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; + /// socket.connect("127.0.0.1:8081").await?; + /// + /// loop { + /// // Wait for the socket to be readable + /// socket.readable().await?; + /// + /// // The buffer is **not** included in the async task and will + /// // only exist on the stack. + /// let mut buf = [0; 1024]; + /// + /// // Try to recv data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match socket.try_recv(&mut buf) { + /// Ok(n) => { + /// println!("GOT {:?}", &buf[..n]); + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub async fn readable(&self) -> io::Result<()> { + self.ready(Interest::READABLE).await?; + Ok(()) } - /// Returns a future that receives a single datagram message on the socket from - /// the remote address to which it is connected. On success, the future will resolve - /// to the number of bytes read. + /// Receives a single datagram message on the socket from the remote address + /// to which it is connected. On success, returns the number of bytes read. /// - /// The function must be called with valid byte array `buf` of sufficient size to - /// hold the message bytes. If a message is too long to fit in the supplied buffer, - /// excess bytes may be discarded. + /// The function must be called with valid byte array `buf` of sufficient + /// size to hold the message bytes. If a message is too long to fit in the + /// supplied buffer, excess bytes may be discarded. /// - /// The [`connect`] method will connect this socket to a remote address. The future - /// will fail if the socket is not connected. + /// The [`connect`] method will connect this socket to a remote address. + /// This method will fail if the socket is not connected. /// /// [`connect`]: method@Self::connect + /// + /// ```no_run + /// use tokio::net::UdpSocket; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Bind socket + /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; + /// socket.connect("127.0.0.1:8081").await?; + /// + /// let mut buf = vec![0; 10]; + /// let n = socket.recv(&mut buf).await?; + /// + /// println!("received {} bytes {:?}", n, &buf[..n]); + /// + /// Ok(()) + /// } + /// ``` pub async fn recv(&self, buf: &mut [u8]) -> io::Result { self.io .registration() @@ -379,26 +626,91 @@ impl UdpSocket { Poll::Ready(Ok(())) } - /// Returns a future that sends data on the socket to the given address. - /// On success, the future will resolve to the number of bytes written. + /// Try to receive a single datagram message on the socket from the remote + /// address to which it is connected. On success, returns the number of + /// bytes read. + /// + /// The function must be called with valid byte array buf of sufficient size + /// to hold the message bytes. If a message is too long to fit in the + /// supplied buffer, excess bytes may be discarded. + /// + /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is + /// returned. This function is usually paired with `readable()`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UdpSocket; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Connect to a peer + /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; + /// socket.connect("127.0.0.1:8081").await?; + /// + /// loop { + /// // Wait for the socket to be readable + /// socket.readable().await?; + /// + /// // The buffer is **not** included in the async task and will + /// // only exist on the stack. + /// let mut buf = [0; 1024]; + /// + /// // Try to recv data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match socket.try_recv(&mut buf) { + /// Ok(n) => { + /// println!("GOT {:?}", &buf[..n]); + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_recv(&self, buf: &mut [u8]) -> io::Result { + self.io + .registration() + .try_io(Interest::READABLE, || self.io.recv(buf)) + } + + /// Sends data on the socket to the given address. On success, returns the + /// number of bytes written. + /// + /// Address type can be any implementor of [`ToSocketAddrs`] trait. See its + /// documentation for concrete examples. /// - /// The future will resolve to an error if the IP version of the socket does - /// not match that of `target`. + /// It is possible for `addr` to yield multiple addresses, but `send_to` + /// will only send data to the first address yielded by `addr`. + /// + /// This will return an error when the IP version of the local socket does + /// not match that returned from [`ToSocketAddrs`]. + /// + /// [`ToSocketAddrs`]: crate::net::ToSocketAddrs /// /// # Example /// /// ```no_run /// use tokio::net::UdpSocket; - /// # use std::{io, net::SocketAddr}; + /// use std::io; /// - /// # #[tokio::main] - /// # async fn main() -> io::Result<()> { - /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::().unwrap()).await?; - /// let buf = b"hello world"; - /// let remote_addr = "127.0.0.1:58000".parse::().unwrap(); - /// let _len = sock.send_to(&buf[..], remote_addr).await?; - /// # Ok(()) - /// # } + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; + /// let len = socket.send_to(b"hello world", "127.0.0.1:8081").await?; + /// + /// println!("Sent {} bytes", len); + /// + /// Ok(()) + /// } /// ``` pub async fn send_to(&self, buf: &[u8], target: A) -> io::Result { let mut addrs = to_socket_addrs(target).await?; @@ -440,8 +752,10 @@ impl UdpSocket { .poll_write_io(cx, || self.io.send_to(buf, *target)) } - /// Try to send data on the socket to the given address, but if the send is blocked - /// this will return right away. + /// Try to send data on the socket to the given address, but if the send is + /// blocked this will return right away. + /// + /// This function is usually paired with `writable()`. /// /// # Returns /// @@ -451,25 +765,44 @@ impl UdpSocket { /// [`ErrorKind::WouldBlock`] is properly handled. An error can also occur /// if the IP version of the socket does not match that of `target`. /// + /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock + /// /// # Example /// /// ```no_run /// use tokio::net::UdpSocket; - /// # use std::{io, net::SocketAddr}; + /// use std::error::Error; + /// use std::io; /// - /// # #[tokio::main] - /// # async fn main() -> io::Result<()> { - /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::().unwrap()).await?; - /// let buf = b"hello world"; - /// let remote_addr = "127.0.0.1:58000".parse::().unwrap(); - /// let _len = sock.try_send_to(&buf[..], remote_addr)?; - /// # Ok(()) - /// # } - /// ``` + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; /// - /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock + /// let dst = "127.0.0.1:8081".parse()?; + /// + /// loop { + /// socket.writable().await?; + /// + /// match socket.try_send_to(&b"hello world"[..], dst) { + /// Ok(sent) => { + /// println!("sent {} bytes", sent); + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// // Writable false positive. + /// continue; + /// } + /// Err(e) => return Err(e.into()), + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` pub fn try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result { - self.io.send_to(buf, target) + self.io + .registration() + .try_io(Interest::WRITABLE, || self.io.send_to(buf, target)) } async fn send_to_addr(&self, buf: &[u8], target: SocketAddr) -> io::Result { @@ -479,27 +812,30 @@ impl UdpSocket { .await } - /// Returns a future that receives a single datagram on the socket. On success, - /// the future resolves to the number of bytes read and the origin. + /// Receives a single datagram message on the socket. On success, returns + /// the number of bytes read and the origin. /// - /// The function must be called with valid byte array `buf` of sufficient size - /// to hold the message bytes. If a message is too long to fit in the supplied - /// buffer, excess bytes may be discarded. + /// The function must be called with valid byte array `buf` of sufficient + /// size to hold the message bytes. If a message is too long to fit in the + /// supplied buffer, excess bytes may be discarded. /// /// # Example /// /// ```no_run /// use tokio::net::UdpSocket; - /// # use std::{io, net::SocketAddr}; + /// use std::io; /// - /// # #[tokio::main] - /// # async fn main() -> io::Result<()> { - /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::().unwrap()).await?; - /// let mut buf = [0u8; 32]; - /// let (len, addr) = sock.recv_from(&mut buf).await?; - /// println!("received {:?} bytes from {:?}", len, addr); - /// # Ok(()) - /// # } + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; + /// + /// let mut buf = vec![0u8; 32]; + /// let (len, addr) = socket.recv_from(&mut buf).await?; + /// + /// println!("received {:?} bytes from {:?}", len, addr); + /// + /// Ok(()) + /// } /// ``` pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { self.io @@ -547,6 +883,61 @@ impl UdpSocket { Poll::Ready(Ok(addr)) } + /// Try to receive a single datagram message on the socket. On success, + /// returns the number of bytes read and the origin. + /// + /// The function must be called with valid byte array buf of sufficient size + /// to hold the message bytes. If a message is too long to fit in the + /// supplied buffer, excess bytes may be discarded. + /// + /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is + /// returned. This function is usually paired with `readable()`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UdpSocket; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Connect to a peer + /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; + /// socket.connect("127.0.0.1:8081").await?; + /// + /// loop { + /// // Wait for the socket to be readable + /// socket.readable().await?; + /// + /// // The buffer is **not** included in the async task and will + /// // only exist on the stack. + /// let mut buf = [0; 1024]; + /// + /// // Try to recv data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match socket.try_recv(&mut buf) { + /// Ok(n) => { + /// println!("GOT {:?}", &buf[..n]); + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.io + .registration() + .try_io(Interest::READABLE, || self.io.recv_from(buf)) + } + /// Receives data from the socket, without removing it from the input queue. /// On success, returns the number of bytes read and the address from whence /// the data came. @@ -563,16 +954,19 @@ impl UdpSocket { /// /// ```no_run /// use tokio::net::UdpSocket; - /// # use std::{io, net::SocketAddr}; + /// use std::io; /// - /// # #[tokio::main] - /// # async fn main() -> io::Result<()> { - /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::().unwrap()).await?; - /// let mut buf = [0u8; 32]; - /// let (len, addr) = sock.peek_from(&mut buf).await?; - /// println!("peeked {:?} bytes from {:?}", len, addr); - /// # Ok(()) - /// # } + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; + /// + /// let mut buf = vec![0u8; 32]; + /// let (len, addr) = socket.peek_from(&mut buf).await?; + /// + /// println!("peeked {:?} bytes from {:?}", len, addr); + /// + /// Ok(()) + /// } /// ``` pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { self.io @@ -795,20 +1189,20 @@ impl UdpSocket { /// /// # Examples /// ``` - /// # use std::error::Error; - /// # #[tokio::main] - /// # async fn main() -> Result<(), Box> { /// use tokio::net::UdpSocket; + /// use std::io; /// - /// // Create a socket - /// let socket = UdpSocket::bind("0.0.0.0:8080").await?; + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Create a socket + /// let socket = UdpSocket::bind("0.0.0.0:8080").await?; /// - /// if let Ok(Some(err)) = socket.take_error() { - /// println!("Got error: {:?}", err); - /// } + /// if let Ok(Some(err)) = socket.take_error() { + /// println!("Got error: {:?}", err); + /// } /// - /// # Ok(()) - /// # } + /// Ok(()) + /// } /// ``` pub fn take_error(&self) -> io::Result> { self.io.take_error() diff --git a/tokio/tests/udp.rs b/tokio/tests/udp.rs index 8b79cb85..291267e0 100644 --- a/tokio/tests/udp.rs +++ b/tokio/tests/udp.rs @@ -2,6 +2,7 @@ #![cfg(feature = "full")] use futures::future::poll_fn; +use std::io; use std::sync::Arc; use tokio::{io::ReadBuf, net::UdpSocket}; @@ -238,6 +239,8 @@ async fn try_send_spawn() { .await .unwrap(); + sender.writable().await.unwrap(); + let sent = &sender .try_send_to(MSG, receiver.local_addr().unwrap()) .unwrap(); @@ -263,3 +266,90 @@ async fn try_send_spawn() { assert_eq!(received, MSG_LEN * 2 + MSG2_LEN); } + +#[tokio::test] +async fn try_send_recv() { + // Create listener + let server = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + + // Create socket pair + let client = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + + // Connect the two + client.connect(server.local_addr().unwrap()).await.unwrap(); + server.connect(client.local_addr().unwrap()).await.unwrap(); + + for _ in 0..5 { + loop { + client.writable().await.unwrap(); + + match client.try_send(b"hello world") { + Ok(n) => { + assert_eq!(n, 11); + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{:?}", e), + } + } + + loop { + server.readable().await.unwrap(); + + let mut buf = [0; 512]; + + match server.try_recv(&mut buf) { + Ok(n) => { + assert_eq!(n, 11); + assert_eq!(&buf[0..11], &b"hello world"[..]); + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{:?}", e), + } + } + } +} + +#[tokio::test] +async fn try_send_to_recv_from() { + // Create listener + let server = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + let saddr = server.local_addr().unwrap(); + + // Create socket pair + let client = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + let caddr = client.local_addr().unwrap(); + + for _ in 0..5 { + loop { + client.writable().await.unwrap(); + + match client.try_send_to(b"hello world", saddr) { + Ok(n) => { + assert_eq!(n, 11); + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{:?}", e), + } + } + + loop { + server.readable().await.unwrap(); + + let mut buf = [0; 512]; + + match server.try_recv_from(&mut buf) { + Ok((n, addr)) => { + assert_eq!(n, 11); + assert_eq!(addr, caddr); + assert_eq!(&buf[0..11], &b"hello world"[..]); + break; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("{:?}", e), + } + } + } +} -- cgit v1.2.3