summaryrefslogtreecommitdiffstats
path: root/tokio/src/net/udp.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/net/udp.rs')
-rw-r--r--tokio/src/net/udp.rs1252
1 files changed, 1252 insertions, 0 deletions
diff --git a/tokio/src/net/udp.rs b/tokio/src/net/udp.rs
new file mode 100644
index 00000000..3775714c
--- /dev/null
+++ b/tokio/src/net/udp.rs
@@ -0,0 +1,1252 @@
+use crate::io::{Interest, PollEvented, ReadBuf, Ready};
+use crate::net::{to_socket_addrs, ToSocketAddrs};
+
+use std::convert::TryFrom;
+use std::fmt;
+use std::io;
+use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr};
+use std::task::{Context, Poll};
+
+cfg_net! {
+ /// A UDP socket
+ ///
+ /// UDP is "connectionless", unlike TCP. Meaning, regardless of what address you've bound to, a `UdpSocket`
+ /// is free to communicate with many different remotes. In tokio there are basically two main ways to use `UdpSocket`:
+ ///
+ /// * one to many: [`bind`](`UdpSocket::bind`) and use [`send_to`](`UdpSocket::send_to`)
+ /// and [`recv_from`](`UdpSocket::recv_from`) to communicate with many different addresses
+ /// * one to one: [`connect`](`UdpSocket::connect`) and associate with a single address, using [`send`](`UdpSocket::send`)
+ /// and [`recv`](`UdpSocket::recv`) to communicate only with that remote address
+ ///
+ /// `UdpSocket` can also be used concurrently to `send_to` and `recv_from` in different tasks,
+ /// all that's required is that you `Arc<UdpSocket>` and clone a reference for each task.
+ ///
+ /// # Streams
+ ///
+ /// If you need to listen over UDP and produce a [`Stream`](`crate::stream::Stream`), you can look
+ /// at [`UdpFramed`].
+ ///
+ /// [`UdpFramed`]: https://docs.rs/tokio-util/latest/tokio_util/udp/struct.UdpFramed.html
+ ///
+ /// # Example: one to many (bind)
+ ///
+ /// Using `bind` we can create a simple echo server that sends and recv's with many different clients:
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let sock = UdpSocket::bind("0.0.0.0:8080").await?;
+ /// let mut buf = [0; 1024];
+ /// loop {
+ /// let (len, addr) = sock.recv_from(&mut buf).await?;
+ /// println!("{:?} bytes received from {:?}", len, addr);
+ ///
+ /// let len = sock.send_to(&buf[..len], addr).await?;
+ /// println!("{:?} bytes sent", len);
+ /// }
+ /// }
+ /// ```
+ ///
+ /// # Example: one to one (connect)
+ ///
+ /// Or using `connect` we can echo with a single remote address using `send` and `recv`:
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let sock = UdpSocket::bind("0.0.0.0:8080").await?;
+ ///
+ /// let remote_addr = "127.0.0.1:59611";
+ /// sock.connect(remote_addr).await?;
+ /// let mut buf = [0; 1024];
+ /// loop {
+ /// let len = sock.recv(&mut buf).await?;
+ /// println!("{:?} bytes received from {:?}", len, remote_addr);
+ ///
+ /// let len = sock.send(&buf[..len]).await?;
+ /// println!("{:?} bytes sent", len);
+ /// }
+ /// }
+ /// ```
+ ///
+ /// # Example: Sending/Receiving concurrently
+ ///
+ /// Because `send_to` and `recv_from` take `&self`. It's perfectly alright to `Arc<UdpSocket>`
+ /// and share the references to multiple tasks, in order to send/receive concurrently. Here is
+ /// a similar "echo" example but that supports concurrent sending/receiving:
+ ///
+ /// ```no_run
+ /// use tokio::{net::UdpSocket, sync::mpsc};
+ /// use std::{io, net::SocketAddr, sync::Arc};
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?;
+ /// let r = Arc::new(sock);
+ /// let s = r.clone();
+ /// let (tx, mut rx) = mpsc::channel::<(Vec<u8>, SocketAddr)>(1_000);
+ ///
+ /// tokio::spawn(async move {
+ /// while let Some((bytes, addr)) = rx.recv().await {
+ /// let len = s.send_to(&bytes, &addr).await.unwrap();
+ /// println!("{:?} bytes sent", len);
+ /// }
+ /// });
+ ///
+ /// let mut buf = [0; 1024];
+ /// loop {
+ /// let (len, addr) = r.recv_from(&mut buf).await?;
+ /// println!("{:?} bytes received from {:?}", len, addr);
+ /// tx.send((buf[..len].to_vec(), addr)).await.unwrap();
+ /// }
+ /// }
+ /// ```
+ ///
+ pub struct UdpSocket {
+ io: PollEvented<mio::net::UdpSocket>,
+ }
+}
+
+impl UdpSocket {
+ /// This function will create a new UDP socket and attempt to bind it to
+ /// the `addr` provided.
+ ///
+ /// # Example
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let sock = UdpSocket::bind("0.0.0.0:8080").await?;
+ /// // use `sock`
+ /// # let _ = sock;
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<UdpSocket> {
+ let addrs = to_socket_addrs(addr).await?;
+ let mut last_err = None;
+
+ for addr in addrs {
+ match UdpSocket::bind_addr(addr) {
+ Ok(socket) => return Ok(socket),
+ Err(e) => last_err = Some(e),
+ }
+ }
+
+ Err(last_err.unwrap_or_else(|| {
+ io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "could not resolve to any address",
+ )
+ }))
+ }
+
+ fn bind_addr(addr: SocketAddr) -> io::Result<UdpSocket> {
+ let sys = mio::net::UdpSocket::bind(addr)?;
+ UdpSocket::new(sys)
+ }
+
+ fn new(socket: mio::net::UdpSocket) -> io::Result<UdpSocket> {
+ let io = PollEvented::new(socket)?;
+ Ok(UdpSocket { io })
+ }
+
+ /// Creates new `UdpSocket` from a previously bound `std::net::UdpSocket`.
+ ///
+ /// This function is intended to be used to wrap a UDP socket from the
+ /// standard library in the Tokio equivalent. The conversion assumes nothing
+ /// about the underlying socket; it is left up to the user to set it in
+ /// non-blocking mode.
+ ///
+ /// This can be used in conjunction with socket2's `Socket` interface to
+ /// configure a socket before it's handed off, such as setting options like
+ /// `reuse_address` or binding to multiple addresses.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if thread-local runtime is not set.
+ ///
+ /// The runtime is usually set implicitly when this function is called
+ /// from a future driven by a tokio runtime, otherwise runtime can be set
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ ///
+ /// # Example
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// # use std::{io, net::SocketAddr};
+ ///
+ /// # #[tokio::main]
+ /// # async fn main() -> io::Result<()> {
+ /// let addr = "0.0.0.0:8080".parse::<SocketAddr>().unwrap();
+ /// let std_sock = std::net::UdpSocket::bind(addr)?;
+ /// std_sock.set_nonblocking(true)?;
+ /// let sock = UdpSocket::from_std(std_sock)?;
+ /// // use `sock`
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn from_std(socket: net::UdpSocket) -> io::Result<UdpSocket> {
+ let io = mio::net::UdpSocket::from_std(socket);
+ UdpSocket::new(io)
+ }
+
+ /// Returns the local address that this socket is bound to.
+ ///
+ /// # Example
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// # use std::{io, net::SocketAddr};
+ ///
+ /// # #[tokio::main]
+ /// # async fn main() -> io::Result<()> {
+ /// let addr = "0.0.0.0:8080".parse::<SocketAddr>().unwrap();
+ /// let sock = UdpSocket::bind(addr).await?;
+ /// // the address the socket is bound to
+ /// let local_addr = sock.local_addr()?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.io.local_addr()
+ }
+
+ /// Connects the UDP socket setting the default destination for send() and
+ /// limiting packets that are read via recv from the address specified in
+ /// `addr`.
+ ///
+ /// # Example
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// # use std::{io, net::SocketAddr};
+ ///
+ /// # #[tokio::main]
+ /// # async fn main() -> io::Result<()> {
+ /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?;
+ ///
+ /// let remote_addr = "127.0.0.1:59600".parse::<SocketAddr>().unwrap();
+ /// sock.connect(remote_addr).await?;
+ /// let mut buf = [0u8; 32];
+ /// // recv from remote_addr
+ /// let len = sock.recv(&mut buf).await?;
+ /// // send to remote_addr
+ /// let _len = sock.send(&buf[..len]).await?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn connect<A: ToSocketAddrs>(&self, addr: A) -> io::Result<()> {
+ let addrs = to_socket_addrs(addr).await?;
+ let mut last_err = None;
+
+ for addr in addrs {
+ match self.io.connect(addr) {
+ Ok(_) => return Ok(()),
+ Err(e) => last_err = Some(e),
+ }
+ }
+
+ Err(last_err.unwrap_or_else(|| {
+ io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "could not resolve to any address",
+ )
+ }))
+ }
+
+ /// Wait for any of the requested ready states.
+ ///
+ /// This function is usually paired with `try_recv()` or `try_send()`. It
+ /// can be used to concurrently recv / send to the same socket on a single
+ /// task without splitting the socket.
+ ///
+ /// The function may complete without the socket being ready. This is a
+ /// false-positive and attempting an operation will return with
+ /// `io::ErrorKind::WouldBlock`.
+ ///
+ /// # Examples
+ ///
+ /// Concurrently receive from and send to the socket on the same task
+ /// without splitting.
+ ///
+ /// ```no_run
+ /// use tokio::io::{self, Interest};
+ /// use tokio::net::UdpSocket;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ /// socket.connect("127.0.0.1:8081").await?;
+ ///
+ /// loop {
+ /// let ready = socket.ready(Interest::READABLE | Interest::WRITABLE).await?;
+ ///
+ /// if ready.is_readable() {
+ /// // The buffer is **not** included in the async task and will only exist
+ /// // on the stack.
+ /// let mut data = [0; 1024];
+ /// match socket.try_recv(&mut data[..]) {
+ /// Ok(n) => {
+ /// println!("received {:?}", &data[..n]);
+ /// }
+ /// // False-positive, continue
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// if ready.is_writable() {
+ /// // Write some data
+ /// match socket.try_send(b"hello world") {
+ /// Ok(n) => {
+ /// println!("sent {} bytes", n);
+ /// }
+ /// // False-positive, continue
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ /// }
+ /// }
+ /// ```
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ let event = self.io.registration().readiness(interest).await?;
+ Ok(event.ready)
+ }
+
+ /// Wait for the socket to become writable.
+ ///
+ /// This function is equivalent to `ready(Interest::WRITABLE)` and is
+ /// usually paired with `try_send()` or `try_send_to()`.
+ ///
+ /// The function may complete without the socket being writable. This is a
+ /// false-positive and attempting a `try_send()` will return with
+ /// `io::ErrorKind::WouldBlock`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Bind socket
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ /// socket.connect("127.0.0.1:8081").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be writable
+ /// socket.writable().await?;
+ ///
+ /// // Try to send data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_send(b"hello world") {
+ /// Ok(n) => {
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn writable(&self) -> io::Result<()> {
+ self.ready(Interest::WRITABLE).await?;
+ Ok(())
+ }
+
+ /// Sends data on the socket to the remote address that the socket is
+ /// connected to.
+ ///
+ /// The [`connect`] method will connect this socket to a remote address.
+ /// This method will fail if the socket is not connected.
+ ///
+ /// [`connect`]: method@Self::connect
+ ///
+ /// # Return
+ ///
+ /// On success, the number of bytes sent is returned, otherwise, the
+ /// encountered error is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::io;
+ /// use tokio::net::UdpSocket;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Bind socket
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ /// socket.connect("127.0.0.1:8081").await?;
+ ///
+ /// // Send a message
+ /// socket.send(b"hello world").await?;
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
+ self.io
+ .registration()
+ .async_io(Interest::WRITABLE, || self.io.send(buf))
+ .await
+ }
+
+ /// Attempts to send data on the socket to the remote address to which it
+ /// was previously `connect`ed.
+ ///
+ /// The [`connect`] method will connect this socket to a remote address.
+ /// This method will fail if the socket is not connected.
+ ///
+ /// Note that on multiple calls to a `poll_*` method in the send direction,
+ /// only the `Waker` from the `Context` passed to the most recent call will
+ /// be scheduled to receive a wakeup.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the socket is not available to write
+ /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ ///
+ /// [`connect`]: method@Self::connect
+ pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
+ self.io
+ .registration()
+ .poll_write_io(cx, || self.io.send(buf))
+ }
+
+ /// Try to send data on the socket to the remote address to which it is
+ /// connected.
+ ///
+ /// When the socket buffer is full, `Err(io::ErrorKind::WouldBlock)` is
+ /// returned. This function is usually paired with `writable()`.
+ ///
+ /// # Returns
+ ///
+ /// If successful, `Ok(n)` is returned, where `n` is the number of bytes
+ /// sent. If the socket is not ready to send data,
+ /// `Err(ErrorKind::WouldBlock)` is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Bind a UDP socket
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ ///
+ /// // Connect to a peer
+ /// socket.connect("127.0.0.1:8081").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be writable
+ /// socket.writable().await?;
+ ///
+ /// // Try to send data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_send(b"hello world") {
+ /// Ok(n) => {
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
+ self.io
+ .registration()
+ .try_io(Interest::WRITABLE, || self.io.send(buf))
+ }
+
+ /// Wait for the socket to become readable.
+ ///
+ /// This function is equivalent to `ready(Interest::READABLE)` and is usually
+ /// paired with `try_recv()`.
+ ///
+ /// The function may complete without the socket being readable. This is a
+ /// false-positive and attempting a `try_recv()` will return with
+ /// `io::ErrorKind::WouldBlock`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Connect to a peer
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ /// socket.connect("127.0.0.1:8081").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// socket.readable().await?;
+ ///
+ /// // The buffer is **not** included in the async task and will
+ /// // only exist on the stack.
+ /// let mut buf = [0; 1024];
+ ///
+ /// // Try to recv data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_recv(&mut buf) {
+ /// Ok(n) => {
+ /// println!("GOT {:?}", &buf[..n]);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn readable(&self) -> io::Result<()> {
+ self.ready(Interest::READABLE).await?;
+ Ok(())
+ }
+
+ /// Receives a single datagram message on the socket from the remote address
+ /// to which it is connected. On success, returns the number of bytes read.
+ ///
+ /// The function must be called with valid byte array `buf` of sufficient
+ /// size to hold the message bytes. If a message is too long to fit in the
+ /// supplied buffer, excess bytes may be discarded.
+ ///
+ /// The [`connect`] method will connect this socket to a remote address.
+ /// This method will fail if the socket is not connected.
+ ///
+ /// [`connect`]: method@Self::connect
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Bind socket
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ /// socket.connect("127.0.0.1:8081").await?;
+ ///
+ /// let mut buf = vec![0; 10];
+ /// let n = socket.recv(&mut buf).await?;
+ ///
+ /// println!("received {} bytes {:?}", n, &buf[..n]);
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.io
+ .registration()
+ .async_io(Interest::READABLE, || self.io.recv(buf))
+ .await
+ }
+
+ /// Attempts to receive a single datagram message on the socket from the remote
+ /// address to which it is `connect`ed.
+ ///
+ /// The [`connect`] method will connect this socket to a remote address. This method
+ /// resolves to an error if the socket is not connected.
+ ///
+ /// Note that on multiple calls to a `poll_*` method in the recv direction, only the
+ /// `Waker` from the `Context` passed to the most recent call will be scheduled to
+ /// receive a wakeup.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the socket is not ready to read
+ /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ ///
+ /// [`connect`]: method@Self::connect
+ pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
+ let n = ready!(self.io.registration().poll_read_io(cx, || {
+ // Safety: will not read the maybe uinitialized bytes.
+ let b = unsafe {
+ &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
+ };
+
+ self.io.recv(b)
+ }))?;
+
+ // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
+ unsafe {
+ buf.assume_init(n);
+ }
+ buf.advance(n);
+ Poll::Ready(Ok(()))
+ }
+
+ /// Try to receive a single datagram message on the socket from the remote
+ /// address to which it is connected. On success, returns the number of
+ /// bytes read.
+ ///
+ /// The function must be called with valid byte array buf of sufficient size
+ /// to hold the message bytes. If a message is too long to fit in the
+ /// supplied buffer, excess bytes may be discarded.
+ ///
+ /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
+ /// returned. This function is usually paired with `readable()`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Connect to a peer
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ /// socket.connect("127.0.0.1:8081").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// socket.readable().await?;
+ ///
+ /// // The buffer is **not** included in the async task and will
+ /// // only exist on the stack.
+ /// let mut buf = [0; 1024];
+ ///
+ /// // Try to recv data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_recv(&mut buf) {
+ /// Ok(n) => {
+ /// println!("GOT {:?}", &buf[..n]);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.io
+ .registration()
+ .try_io(Interest::READABLE, || self.io.recv(buf))
+ }
+
+ /// Sends data on the socket to the given address. On success, returns the
+ /// number of bytes written.
+ ///
+ /// Address type can be any implementor of [`ToSocketAddrs`] trait. See its
+ /// documentation for concrete examples.
+ ///
+ /// It is possible for `addr` to yield multiple addresses, but `send_to`
+ /// will only send data to the first address yielded by `addr`.
+ ///
+ /// This will return an error when the IP version of the local socket does
+ /// not match that returned from [`ToSocketAddrs`].
+ ///
+ /// [`ToSocketAddrs`]: crate::net::ToSocketAddrs
+ ///
+ /// # Example
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ /// let len = socket.send_to(b"hello world", "127.0.0.1:8081").await?;
+ ///
+ /// println!("Sent {} bytes", len);
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn send_to<A: ToSocketAddrs>(&self, buf: &[u8], target: A) -> io::Result<usize> {
+ let mut addrs = to_socket_addrs(target).await?;
+
+ match addrs.next() {
+ Some(target) => self.send_to_addr(buf, target).await,
+ None => Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "no addresses to send data to",
+ )),
+ }
+ }
+
+ /// Attempts to send data on the socket to a given address.
+ ///
+ /// Note that on multiple calls to a `poll_*` method in the send direction, only the
+ /// `Waker` from the `Context` passed to the most recent call will be scheduled to
+ /// receive a wakeup.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the socket is not ready to write
+ /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent.
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ pub fn poll_send_to(
+ &self,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ target: SocketAddr,
+ ) -> Poll<io::Result<usize>> {
+ self.io
+ .registration()
+ .poll_write_io(cx, || self.io.send_to(buf, target))
+ }
+
+ /// Try to send data on the socket to the given address, but if the send is
+ /// blocked this will return right away.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// # Returns
+ ///
+ /// If successfull, returns the number of bytes sent
+ ///
+ /// Users should ensure that when the remote cannot receive, the
+ /// [`ErrorKind::WouldBlock`] is properly handled. An error can also occur
+ /// if the IP version of the socket does not match that of `target`.
+ ///
+ /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock
+ ///
+ /// # Example
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::error::Error;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ ///
+ /// let dst = "127.0.0.1:8081".parse()?;
+ ///
+ /// loop {
+ /// socket.writable().await?;
+ ///
+ /// match socket.try_send_to(&b"hello world"[..], dst) {
+ /// Ok(sent) => {
+ /// println!("sent {} bytes", sent);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// // Writable false positive.
+ /// continue;
+ /// }
+ /// Err(e) => return Err(e.into()),
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> {
+ self.io
+ .registration()
+ .try_io(Interest::WRITABLE, || self.io.send_to(buf, target))
+ }
+
+ async fn send_to_addr(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> {
+ self.io
+ .registration()
+ .async_io(Interest::WRITABLE, || self.io.send_to(buf, target))
+ .await
+ }
+
+ /// Receives a single datagram message on the socket. On success, returns
+ /// the number of bytes read and the origin.
+ ///
+ /// The function must be called with valid byte array `buf` of sufficient
+ /// size to hold the message bytes. If a message is too long to fit in the
+ /// supplied buffer, excess bytes may be discarded.
+ ///
+ /// # Example
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ ///
+ /// let mut buf = vec![0u8; 32];
+ /// let (len, addr) = socket.recv_from(&mut buf).await?;
+ ///
+ /// println!("received {:?} bytes from {:?}", len, addr);
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+ self.io
+ .registration()
+ .async_io(Interest::READABLE, || self.io.recv_from(buf))
+ .await
+ }
+
+ /// Attempts to receive a single datagram on the socket.
+ ///
+ /// Note that on multiple calls to a `poll_*` method in the recv direction, only the
+ /// `Waker` from the `Context` passed to the most recent call will be scheduled to
+ /// receive a wakeup.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the socket is not ready to read
+ /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ pub fn poll_recv_from(
+ &self,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<SocketAddr>> {
+ let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || {
+ // Safety: will not read the maybe uinitialized bytes.
+ let b = unsafe {
+ &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
+ };
+
+ self.io.recv_from(b)
+ }))?;
+
+ // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
+ unsafe {
+ buf.assume_init(n);
+ }
+ buf.advance(n);
+ Poll::Ready(Ok(addr))
+ }
+
+ /// Try to receive a single datagram message on the socket. On success,
+ /// returns the number of bytes read and the origin.
+ ///
+ /// The function must be called with valid byte array buf of sufficient size
+ /// to hold the message bytes. If a message is too long to fit in the
+ /// supplied buffer, excess bytes may be discarded.
+ ///
+ /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
+ /// returned. This function is usually paired with `readable()`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Connect to a peer
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ /// socket.connect("127.0.0.1:8081").await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// socket.readable().await?;
+ ///
+ /// // The buffer is **not** included in the async task and will
+ /// // only exist on the stack.
+ /// let mut buf = [0; 1024];
+ ///
+ /// // Try to recv data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_recv(&mut buf) {
+ /// Ok(n) => {
+ /// println!("GOT {:?}", &buf[..n]);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+ self.io
+ .registration()
+ .try_io(Interest::READABLE, || self.io.recv_from(buf))
+ }
+
+ /// Receives data from the socket, without removing it from the input queue.
+ /// On success, returns the number of bytes read and the address from whence
+ /// the data came.
+ ///
+ /// # Notes
+ ///
+ /// On Windows, if the data is larger than the buffer specified, the buffer
+ /// is filled with the first part of the data, and peek_from returns the error
+ /// WSAEMSGSIZE(10040). The excess data is lost.
+ /// Make sure to always use a sufficiently large buffer to hold the
+ /// maximum UDP packet size, which can be up to 65536 bytes in size.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UdpSocket;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let socket = UdpSocket::bind("127.0.0.1:8080").await?;
+ ///
+ /// let mut buf = vec![0u8; 32];
+ /// let (len, addr) = socket.peek_from(&mut buf).await?;
+ ///
+ /// println!("peeked {:?} bytes from {:?}", len, addr);
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+ self.io
+ .registration()
+ .async_io(Interest::READABLE, || self.io.peek_from(buf))
+ .await
+ }
+
+ /// Receives data from the socket, without removing it from the input queue.
+ /// On success, returns the number of bytes read.
+ ///
+ /// # Notes
+ ///
+ /// Note that on multiple calls to a `poll_*` method in the recv direction, only the
+ /// `Waker` from the `Context` passed to the most recent call will be scheduled to
+ /// receive a wakeup
+ ///
+ /// On Windows, if the data is larger than the buffer specified, the buffer
+ /// is filled with the first part of the data, and peek returns the error
+ /// WSAEMSGSIZE(10040). The excess data is lost.
+ /// Make sure to always use a sufficiently large buffer to hold the
+ /// maximum UDP packet size, which can be up to 65536 bytes in size.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the socket is not ready to read
+ /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ pub fn poll_peek_from(
+ &self,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<SocketAddr>> {
+ let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || {
+ // Safety: will not read the maybe uinitialized bytes.
+ let b = unsafe {
+ &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
+ };
+
+ self.io.peek_from(b)
+ }))?;
+
+ // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
+ unsafe {
+ buf.assume_init(n);
+ }