summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-11-16 15:44:01 -0800
committerGitHub <noreply@github.com>2020-11-16 15:44:01 -0800
commit0ea23076503c5151d68a781a3d91823396c82949 (patch)
tree1e49d7dc0bb3cee6271133d942ba49c5971fde29
parentd0ebb4154748166a4ba07baa4b424a1c45efd219 (diff)
net: add UdpSocket readiness and non-blocking ops (#3138)
Adds `ready()`, `readable()`, and `writable()` async methods for waiting for socket readiness. Adds `try_send`, `try_send_to`, `try_recv`, and `try_recv_from` for performing non-blocking operations on the socket. This is the UDP equivalent of #3130.
-rw-r--r--tokio/src/net/tcp/stream.rs5
-rw-r--r--tokio/src/net/udp/socket.rs564
-rw-r--r--tokio/tests/udp.rs90
3 files changed, 571 insertions, 88 deletions
diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs
index 8a157e1c..7427ba54 100644
--- a/tokio/src/net/tcp/stream.rs
+++ b/tokio/src/net/tcp/stream.rs
@@ -385,7 +385,7 @@ impl TcpStream {
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
/// number of bytes read. `Ok(n)` indicates the stream's read half is closed
/// and will no longer yield data. If the stream is not ready to read data
- /// `Err(io::ErrorKinid::WouldBlock)` is returned.
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
///
@@ -495,8 +495,7 @@ impl TcpStream {
/// The function will attempt to write the entire contents of `buf`, but
/// only part of the buffer may be written.
///
- /// This function is equivalent to `ready(Interest::WRITABLE)` is usually
- /// paired with `try_write()`.
+ /// This function is usually paired with `writable()`.
///
/// # Return
///
diff --git a/tokio/src/net/udp/socket.rs b/tokio/src/net/udp/socket.rs
index f8b6a787..a9c5c868 100644
--- a/tokio/src/net/udp/socket.rs
+++ b/tokio/src/net/udp/socket.rs
@@ -1,4 +1,4 @@
-use crate::io::{Interest, PollEvented, ReadBuf};
+use crate::io::{Interest, PollEvented, ReadBuf, Ready};
use crate::net::{to_socket_addrs, ToSocketAddrs};
use std::convert::TryFrom;
@@ -262,13 +262,149 @@ impl UdpSocket {
}))
}
- /// Returns a future that sends data on the socket to the remote address to which it is connected.
- /// On success, the future will resolve to the number of bytes written.
+ /// Wait for any of the requested ready states.
///
- /// The [`connect`] method will connect this socket to a remote address. The future
- /// will resolve to an error if the socket is not connected.
+ /// This function is usually paired with `try_recv()` or `try_send()`. It
+ /// can be used to concurrently recv / send to the same socket on a single
+ /// task without splitting the socket.
+ ///
+ /// The function may complete without the socket being ready. This is a
+ /// false-positive and attempting an operation will return with
+ /// `io::ErrorKind::WouldBlock`.
+ ///
+ /// # Examples
+ ///
+ /// Concurrently receive from and send to the socket on the same task
+ /// without splitting.
+ ///
+ /// ```no_run
+ /// use tokio::io::{self, Interest};
+ /// use tokio::net::UdpSocket;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ /// socket.connect("127.0.0.1:8081").await?;
+ ///
+ /// loop {
+ /// let ready = socket.ready(Interest::READABLE | Interest::WRITABLE).await?;
+ ///
+ /// if ready.is_readable() {
+ /// // The buffer is **not** included in the async task and will only exist
+ /// // on the stack.
+ /// let mut data = [0; 1024];
+ /// match socket.try_recv(&mut data[..]) {
+ /// Ok(n) => {
+ /// println!("received {:?}", &data[..n]);
+ /// }
+ /// // False-positive, continue
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// if ready.is_writable() {
+ /// // Write some data
+ /// match socket.try_send(b"hello world") {
+ /// Ok(n) => {
+ /// println!("sent {} bytes", n);
+ /// }
+ /// // False-positive, continue
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ /// }
+ /// }
+ /// ```
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ let event = self.io.registration().readiness(interest).await?;
+ Ok(event.ready)
+ }
+
+ /// Wait for the socket to become writable.
+ ///
+ /// This function is equivalent to `ready(Interest::WRITABLE)` and is
+ /// usually paired with `try_send()` or `try_send_to()`.
+ ///
+ /// The function may complete without the socket being writable. This is a
+ /// false-positive and attempting a `try_send()` will return with
+ /// `io::ErrorKind::WouldBlock`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Bind socket
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ /// socket.connect("127.0.0.1:8081").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be writable
+ /// socket.writable().await?;
+ ///
+ /// // Try to send data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_send(b"hello world") {
+ /// Ok(n) => {
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn writable(&self) -> io::Result<()> {
+ self.ready(Interest::WRITABLE).await?;
+ Ok(())
+ }
+
+ /// Sends data on the socket to the remote address that the socket is
+ /// connected to.
+ ///
+ /// The [`connect`] method will connect this socket to a remote address.
+ /// This method will fail if the socket is not connected.
///
/// [`connect`]: method@Self::connect
+ ///
+ /// # Return
+ ///
+ /// On success, the number of bytes sent is returned, otherwise, the
+ /// encountered error is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::io;
+ /// use tokio::net::UdpSocket;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Bind socket
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ /// socket.connect("127.0.0.1:8081").await?;
+ ///
+ /// // Send a message
+ /// socket.send(b"hello world").await?;
+ ///
+ /// Ok(())
+ /// }
+ /// ```
pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
self.io
.registration()
@@ -276,15 +412,15 @@ impl UdpSocket {
.await
}
- /// Attempts to send data on the socket to the remote address to which it was previously
- /// `connect`ed.
+ /// Attempts to send data on the socket to the remote address to which it
+ /// was previously `connect`ed.
///
- /// The [`connect`] method will connect this socket to a remote address. The future
- /// will resolve to an error if the socket is not connected.
+ /// The [`connect`] method will connect this socket to a remote address.
+ /// This method will fail if the socket is not connected.
///
- /// Note that on multiple calls to a `poll_*` method in the send direction, only the
- /// `Waker` from the `Context` passed to the most recent call will be scheduled to
- /// receive a wakeup.
+ /// Note that on multiple calls to a `poll_*` method in the send direction,
+ /// only the `Waker` from the `Context` passed to the most recent call will
+ /// be scheduled to receive a wakeup.
///
/// # Return value
///
@@ -308,29 +444,140 @@ impl UdpSocket {
/// Try to send data on the socket to the remote address to which it is
/// connected.
///
+ /// When the socket buffer is full, `Err(io::ErrorKind::WouldBlock)` is
+ /// returned. This function is usually paired with `writable()`.
+ ///
/// # Returns
///
- /// If successfull, the number of bytes sent is returned. Users
- /// should ensure that when the remote cannot receive, the
- /// [`ErrorKind::WouldBlock`] is properly handled.
+ /// If successful, `Ok(n)` is returned, where `n` is the number of bytes
+ /// sent. If the socket is not ready to send data,
+ /// `Err(ErrorKind::WouldBlock)` is returned.
///
- /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Bind a UDP socket
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ ///
+ /// // Connect to a peer
+ /// socket.connect("127.0.0.1:8081").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be writable
+ /// socket.writable().await?;
+ ///
+ /// // Try to send data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_send(b"hello world") {
+ /// Ok(n) => {
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
- self.io.send(buf)
+ self.io
+ .registration()
+ .try_io(Interest::WRITABLE, || self.io.send(buf))
+ }
+
+ /// Wait for the socket to become readable.
+ ///
+ /// This function is equivalent to `ready(Interest::READABLE)` and is usually
+ /// paired with `try_recv()`.
+ ///
+ /// The function may complete without the socket being readable. This is a
+ /// false-positive and attempting a `try_recv()` will return with
+ /// `io::ErrorKind::WouldBlock`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Connect to a peer
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ /// socket.connect("127.0.0.1:8081").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// socket.readable().await?;
+ ///
+ /// // The buffer is **not** included in the async task and will
+ /// // only exist on the stack.
+ /// let mut buf = [0; 1024];
+ ///
+ /// // Try to recv data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_recv(&mut buf) {
+ /// Ok(n) => {
+ /// println!("GOT {:?}", &buf[..n]);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn readable(&self) -> io::Result<()> {
+ self.ready(Interest::READABLE).await?;
+ Ok(())
}
- /// Returns a future that receives a single datagram message on the socket from
- /// the remote address to which it is connected. On success, the future will resolve
- /// to the number of bytes read.
+ /// Receives a single datagram message on the socket from the remote address
+ /// to which it is connected. On success, returns the number of bytes read.
///
- /// The function must be called with valid byte array `buf` of sufficient size to
- /// hold the message bytes. If a message is too long to fit in the supplied buffer,
- /// excess bytes may be discarded.
+ /// The function must be called with valid byte array `buf` of sufficient
+ /// size to hold the message bytes. If a message is too long to fit in the
+ /// supplied buffer, excess bytes may be discarded.
///
- /// The [`connect`] method will connect this socket to a remote address. The future
- /// will fail if the socket is not connected.
+ /// The [`connect`] method will connect this socket to a remote address.
+ /// This method will fail if the socket is not connected.
///
/// [`connect`]: method@Self::connect
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Bind socket
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ /// socket.connect("127.0.0.1:8081").await?;
+ ///
+ /// let mut buf = vec![0; 10];
+ /// let n = socket.recv(&mut buf).await?;
+ ///
+ /// println!("received {} bytes {:?}", n, &buf[..n]);
+ ///
+ /// Ok(())
+ /// }
+ /// ```
pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.io
.registration()
@@ -379,26 +626,91 @@ impl UdpSocket {
Poll::Ready(Ok(()))
}
- /// Returns a future that sends data on the socket to the given address.
- /// On success, the future will resolve to the number of bytes written.
+ /// Try to receive a single datagram message on the socket from the remote
+ /// address to which it is connected. On success, returns the number of
+ /// bytes read.
+ ///
+ /// The function must be called with valid byte array buf of sufficient size
+ /// to hold the message bytes. If a message is too long to fit in the
+ /// supplied buffer, excess bytes may be discarded.
+ ///
+ /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
+ /// returned. This function is usually paired with `readable()`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Connect to a peer
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ /// socket.connect("127.0.0.1:8081").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// socket.readable().await?;
+ ///
+ /// // The buffer is **not** included in the async task and will
+ /// // only exist on the stack.
+ /// let mut buf = [0; 1024];
+ ///
+ /// // Try to recv data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_recv(&mut buf) {
+ /// Ok(n) => {
+ /// println!("GOT {:?}", &buf[..n]);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.io
+ .registration()
+ .try_io(Interest::READABLE, || self.io.recv(buf))
+ }
+
+ /// Sends data on the socket to the given address. On success, returns the
+ /// number of bytes written.
+ ///
+ /// Address type can be any implementor of [`ToSocketAddrs`] trait. See its
+ /// documentation for concrete examples.
///
- /// The future will resolve to an error if the IP version of the socket does
- /// not match that of `target`.
+ /// It is possible for `addr` to yield multiple addresses, but `send_to`
+ /// will only send data to the first address yielded by `addr`.
+ ///
+ /// This will return an error when the IP version of the local socket does
+ /// not match that returned from [`ToSocketAddrs`].
+ ///
+ /// [`ToSocketAddrs`]: crate::net::ToSocketAddrs
///
/// # Example
///
/// ```no_run
/// use tokio::net::UdpSocket;
- /// # use std::{io, net::SocketAddr};
+ /// use std::io;
///
- /// # #[tokio::main]
- /// # async fn main() -> io::Result<()> {
- /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?;
- /// let buf = b"hello world";
- /// let remote_addr = "127.0.0.1:58000".parse::<SocketAddr>().unwrap();
- /// let _len = sock.send_to(&buf[..], remote_addr).await?;
- /// # Ok(())
- /// # }
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ /// let len = socket.send_to(b"hello world", "127.0.0.1:8081").await?;
+ ///
+ /// println!("Sent {} bytes", len);
+ ///
+ /// Ok(())
+ /// }
/// ```
pub async fn send_to<A: ToSocketAddrs>(&self, buf: &[u8], target: A) -> io::Result<usize> {
let mut addrs = to_socket_addrs(target).await?;
@@ -440,8 +752,10 @@ impl UdpSocket {
.poll_write_io(cx, || self.io.send_to(buf, *target))
}
- /// Try to send data on the socket to the given address, but if the send is blocked
- /// this will return right away.
+ /// Try to send data on the socket to the given address, but if the send is
+ /// blocked this will return right away.
+ ///
+ /// This function is usually paired with `writable()`.
///
/// # Returns
///
@@ -451,25 +765,44 @@ impl UdpSocket {
/// [`ErrorKind::WouldBlock`] is properly handled. An error can also occur
/// if the IP version of the socket does not match that of `target`.
///
+ /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock
+ ///
/// # Example
///
/// ```no_run
/// use tokio::net::UdpSocket;
- /// # use std::{io, net::SocketAddr};
+ /// use std::error::Error;
+ /// use std::io;
///
- /// # #[tokio::main]
- /// # async fn main() -> io::Result<()> {
- /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?;
- /// let buf = b"hello world";
- /// let remote_addr = "127.0.0.1:58000".parse::<SocketAddr>().unwrap();
- /// let _len = sock.try_send_to(&buf[..], remote_addr)?;
- /// # Ok(())
- /// # }
- /// ```
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
///
- /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock
+ /// let dst = "127.0.0.1:8081".parse()?;
+ ///
+ /// loop {
+ /// socket.writable().await?;
+ ///
+ /// match socket.try_send_to(&b"hello world"[..], dst) {
+ /// Ok(sent) => {
+ /// println!("sent {} bytes", sent);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// // Writable false positive.
+ /// continue;
+ /// }
+ /// Err(e) => return Err(e.into()),
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
pub fn try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> {
- self.io.send_to(buf, target)
+ self.io
+ .registration()
+ .try_io(Interest::WRITABLE, || self.io.send_to(buf, target))
}
async fn send_to_addr(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> {
@@ -479,27 +812,30 @@ impl UdpSocket {
.await
}
- /// Returns a future that receives a single datagram on the socket. On success,
- /// the future resolves to the number of bytes read and the origin.
+ /// Receives a single datagram message on the socket. On success, returns
+ /// the number of bytes read and the origin.
///
- /// The function must be called with valid byte array `buf` of sufficient size
- /// to hold the message bytes. If a message is too long to fit in the supplied
- /// buffer, excess bytes may be discarded.
+ /// The function must be called with valid byte array `buf` of sufficient
+ /// size to hold the message bytes. If a message is too long to fit in the
+ /// supplied buffer, excess bytes may be discarded.
///
/// # Example
///
/// ```no_run
/// use tokio::net::UdpSocket;
- /// # use std::{io, net::SocketAddr};
+ /// use std::io;
///
- /// # #[tokio::main]
- /// # async fn main() -> io::Result<()> {
- /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?;
- /// let mut buf = [0u8; 32];
- /// let (len, addr) = sock.recv_from(&mut buf).await?;
- /// println!("received {:?} bytes from {:?}", len, addr);
- /// # Ok(())
- /// # }
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ ///
+ /// let mut buf = vec![0u8; 32];
+ /// let (len, addr) = socket.recv_from(&mut buf).await?;
+ ///
+ /// println!("received {:?} bytes from {:?}", len, addr);
+ ///
+ /// Ok(())
+ /// }
/// ```
pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.io
@@ -547,6 +883,61 @@ impl UdpSocket {
Poll::Ready(Ok(addr))
}
+ /// Try to receive a single datagram message on the socket. On success,
+ /// returns the number of bytes read and the origin.
+ ///
+ /// The function must be called with valid byte array buf of sufficient size
+ /// to hold the message bytes. If a message is too long to fit in the
+ /// supplied buffer, excess bytes may be discarded.
+ ///
+ /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
+ /// returned. This function is usually paired with `readable()`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Connect to a peer
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ /// socket.connect("127.0.0.1:8081").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// socket.readable().await?;
+ ///
+ /// // The buffer is **not** included in the async task and will
+ /// // only exist on the stack.
+ /// let mut buf = [0; 1024];
+ ///
+ /// // Try to recv data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_recv(&mut buf) {
+ /// Ok(n) => {
+ /// println!("GOT {:?}", &buf[..n]);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+ self.io
+ .registration()
+ .try_io(Interest::READABLE, || self.io.recv_from(buf))
+ }
+
/// Receives data from the socket, without removing it from the input queue.
/// On success, returns the number of bytes read and the address from whence
/// the data came.
@@ -563,16 +954,19 @@ impl UdpSocket {
///
/// ```no_run
/// use tokio::net::UdpSocket;
- /// # use std::{io, net::SocketAddr};
+ /// use std::io;
///
- /// # #[tokio::main]
- /// # async fn main() -> io::Result<()> {
- /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?;
- /// let mut buf = [0u8; 32];
- /// let (len, addr) = sock.peek_from(&mut buf).await?;
- /// println!("peeked {:?} bytes from {:?}", len, addr);
- /// # Ok(())
- /// # }
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ ///
+ /// let mut buf = vec![0u8; 32];
+ /// let (len, addr) = socket.peek_from(&mut buf).await?;
+ ///
+ /// println!("peeked {:?} bytes from {:?}", len, addr);
+ ///
+ /// Ok(())
+ /// }
/// ```
pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.io
@@ -795,20 +1189,20 @@ impl UdpSocket {
///
/// # Examples
/// ```
- /// # use std::error::Error;
- /// # #[tokio::main]
- /// # async fn main() -> Result<(), Box<dyn Error>> {
/// use tokio::net::UdpSocket;
+ /// use std::io;
///
- /// // Create a socket
- /// let socket = UdpSocket::bind("0.0.0.0:8080").await?;
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Create a socket
+ /// let socket = UdpSocket::bind("0.0.0.0:8080").await?;
///
- /// if let Ok(Some(err)) = socket.take_error() {
- /// println!("Got error: {:?}", err);
- /// }
+ /// if let Ok(Some(err)) = socket.take_error() {
+ /// println!("Got error: {:?}", err);
+ /// }
///
- /// # Ok(())
- /// # }
+ /// Ok(())
+ /// }
/// ```
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.io.take_error()
diff --git a/tokio/tests/udp.rs b/tokio/tests/udp.rs
index 8b79cb85..291267e0 100644
--- a/tokio/tests/udp.rs
+++ b/tokio/tests/udp.rs
@@ -2,6 +2,7 @@
#![cfg(feature = "full")]
use futures::future::poll_fn;
+use std::io;
use std::sync::Arc;
use tokio::{io::ReadBuf, net::UdpSocket};
@@ -238,6 +239,8 @@ async fn try_send_spawn() {
.await
.unwrap();
+ sender.writable().await.unwrap();
+
let sent = &sender
.try_send_to(MSG, receiver.local_addr().unwrap())
.unwrap();
@@ -263,3 +266,90 @@ async fn try_send_spawn() {
assert_eq!(received, MSG_LEN * 2 + MSG2_LEN);
}
+
+#[tokio::test]
+async fn try_send_recv() {
+ // Create listener
+ let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+
+ // Create socket pair
+ let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+
+ // Connect the two
+ client.connect(server.local_addr().unwrap()).await.unwrap();
+ server.connect(client.local_addr().unwrap()).await.unwrap();
+
+ for _ in 0..5 {
+ loop {
+ client.writable().await.unwrap();
+
+ match client.try_send(b"hello world") {
+ Ok(n) => {
+ assert_eq!(n, 11);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+
+ loop {
+ server.readable().await.unwrap();
+
+ let mut buf = [0; 512];
+
+ match server.try_recv(&mut buf) {
+ Ok(n) => {
+ assert_eq!(n, 11);
+ assert_eq!(&buf[0..11], &b"hello world"[..]);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+ }
+}
+
+#[tokio::test]
+async fn try_send_to_recv_from() {
+ // Create listener
+ let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+ let saddr = server.local_addr().unwrap();
+
+ // Create socket pair
+ let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
+ let caddr = client.local_addr().unwrap();
+
+ for _ in 0..5 {
+ loop {
+ client.writable().await.unwrap();
+
+ match client.try_send_to(b"hello world", saddr) {
+ Ok(n) => {
+ assert_eq!(n, 11);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+
+ loop {
+ server.readable().await.unwrap();
+
+ let mut buf = [0; 512];
+
+ match server.try_recv_from(&mut buf) {
+ Ok((n, addr)) => {
+ assert_eq!(n, 11);
+ assert_eq!(addr, caddr);
+ assert_eq!(&buf[0..11], &b"hello world"[..]);
+ break;
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(e) => panic!("{:?}", e),
+ }
+ }
+ }
+}