diff options
Diffstat (limited to 'tokio/src/net/udp')
-rw-r--r-- | tokio/src/net/udp/mod.rs | 3 | ||||
-rw-r--r-- | tokio/src/net/udp/socket.rs | 1252 |
2 files changed, 0 insertions, 1255 deletions
diff --git a/tokio/src/net/udp/mod.rs b/tokio/src/net/udp/mod.rs deleted file mode 100644 index c9bb0f83..00000000 --- a/tokio/src/net/udp/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -//! UDP utility types. - -pub(crate) mod socket; diff --git a/tokio/src/net/udp/socket.rs b/tokio/src/net/udp/socket.rs deleted file mode 100644 index 3775714c..00000000 --- a/tokio/src/net/udp/socket.rs +++ /dev/null @@ -1,1252 +0,0 @@ -use crate::io::{Interest, PollEvented, ReadBuf, Ready}; -use crate::net::{to_socket_addrs, ToSocketAddrs}; - -use std::convert::TryFrom; -use std::fmt; -use std::io; -use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr}; -use std::task::{Context, Poll}; - -cfg_net! { - /// A UDP socket - /// - /// UDP is "connectionless", unlike TCP. Meaning, regardless of what address you've bound to, a `UdpSocket` - /// is free to communicate with many different remotes. In tokio there are basically two main ways to use `UdpSocket`: - /// - /// * one to many: [`bind`](`UdpSocket::bind`) and use [`send_to`](`UdpSocket::send_to`) - /// and [`recv_from`](`UdpSocket::recv_from`) to communicate with many different addresses - /// * one to one: [`connect`](`UdpSocket::connect`) and associate with a single address, using [`send`](`UdpSocket::send`) - /// and [`recv`](`UdpSocket::recv`) to communicate only with that remote address - /// - /// `UdpSocket` can also be used concurrently to `send_to` and `recv_from` in different tasks, - /// all that's required is that you `Arc<UdpSocket>` and clone a reference for each task. - /// - /// # Streams - /// - /// If you need to listen over UDP and produce a [`Stream`](`crate::stream::Stream`), you can look - /// at [`UdpFramed`]. - /// - /// [`UdpFramed`]: https://docs.rs/tokio-util/latest/tokio_util/udp/struct.UdpFramed.html - /// - /// # Example: one to many (bind) - /// - /// Using `bind` we can create a simple echo server that sends and recv's with many different clients: - /// ```no_run - /// use tokio::net::UdpSocket; - /// use std::io; - /// - /// #[tokio::main] - /// async fn main() -> io::Result<()> { - /// let sock = UdpSocket::bind("0.0.0.0:8080").await?; - /// let mut buf = [0; 1024]; - /// loop { - /// let (len, addr) = sock.recv_from(&mut buf).await?; - /// println!("{:?} bytes received from {:?}", len, addr); - /// - /// let len = sock.send_to(&buf[..len], addr).await?; - /// println!("{:?} bytes sent", len); - /// } - /// } - /// ``` - /// - /// # Example: one to one (connect) - /// - /// Or using `connect` we can echo with a single remote address using `send` and `recv`: - /// ```no_run - /// use tokio::net::UdpSocket; - /// use std::io; - /// - /// #[tokio::main] - /// async fn main() -> io::Result<()> { - /// let sock = UdpSocket::bind("0.0.0.0:8080").await?; - /// - /// let remote_addr = "127.0.0.1:59611"; - /// sock.connect(remote_addr).await?; - /// let mut buf = [0; 1024]; - /// loop { - /// let len = sock.recv(&mut buf).await?; - /// println!("{:?} bytes received from {:?}", len, remote_addr); - /// - /// let len = sock.send(&buf[..len]).await?; - /// println!("{:?} bytes sent", len); - /// } - /// } - /// ``` - /// - /// # Example: Sending/Receiving concurrently - /// - /// Because `send_to` and `recv_from` take `&self`. It's perfectly alright to `Arc<UdpSocket>` - /// and share the references to multiple tasks, in order to send/receive concurrently. Here is - /// a similar "echo" example but that supports concurrent sending/receiving: - /// - /// ```no_run - /// use tokio::{net::UdpSocket, sync::mpsc}; - /// use std::{io, net::SocketAddr, sync::Arc}; - /// - /// #[tokio::main] - /// async fn main() -> io::Result<()> { - /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?; - /// let r = Arc::new(sock); - /// let s = r.clone(); - /// let (tx, mut rx) = mpsc::channel::<(Vec<u8>, SocketAddr)>(1_000); - /// - /// tokio::spawn(async move { - /// while let Some((bytes, addr)) = rx.recv().await { - /// let len = s.send_to(&bytes, &addr).await.unwrap(); - /// println!("{:?} bytes sent", len); - /// } - /// }); - /// - /// let mut buf = [0; 1024]; - /// loop { - /// let (len, addr) = r.recv_from(&mut buf).await?; - /// println!("{:?} bytes received from {:?}", len, addr); - /// tx.send((buf[..len].to_vec(), addr)).await.unwrap(); - /// } - /// } - /// ``` - /// - pub struct UdpSocket { - io: PollEvented<mio::net::UdpSocket>, - } -} - -impl UdpSocket { - /// This function will create a new UDP socket and attempt to bind it to - /// the `addr` provided. - /// - /// # Example - /// - /// ```no_run - /// use tokio::net::UdpSocket; - /// use std::io; - /// - /// #[tokio::main] - /// async fn main() -> io::Result<()> { - /// let sock = UdpSocket::bind("0.0.0.0:8080").await?; - /// // use `sock` - /// # let _ = sock; - /// Ok(()) - /// } - /// ``` - pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<UdpSocket> { - let addrs = to_socket_addrs(addr).await?; - let mut last_err = None; - - for addr in addrs { - match UdpSocket::bind_addr(addr) { - Ok(socket) => return Ok(socket), - Err(e) => last_err = Some(e), - } - } - - Err(last_err.unwrap_or_else(|| { - io::Error::new( - io::ErrorKind::InvalidInput, - "could not resolve to any address", - ) - })) - } - - fn bind_addr(addr: SocketAddr) -> io::Result<UdpSocket> { - let sys = mio::net::UdpSocket::bind(addr)?; - UdpSocket::new(sys) - } - - fn new(socket: mio::net::UdpSocket) -> io::Result<UdpSocket> { - let io = PollEvented::new(socket)?; - Ok(UdpSocket { io }) - } - - /// Creates new `UdpSocket` from a previously bound `std::net::UdpSocket`. - /// - /// This function is intended to be used to wrap a UDP socket from the - /// standard library in the Tokio equivalent. The conversion assumes nothing - /// about the underlying socket; it is left up to the user to set it in - /// non-blocking mode. - /// - /// This can be used in conjunction with socket2's `Socket` interface to - /// configure a socket before it's handed off, such as setting options like - /// `reuse_address` or binding to multiple addresses. - /// - /// # Panics - /// - /// This function panics if thread-local runtime is not set. - /// - /// The runtime is usually set implicitly when this function is called - /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. - /// - /// # Example - /// - /// ```no_run - /// use tokio::net::UdpSocket; - /// # use std::{io, net::SocketAddr}; - /// - /// # #[tokio::main] - /// # async fn main() -> io::Result<()> { - /// let addr = "0.0.0.0:8080".parse::<SocketAddr>().unwrap(); - /// let std_sock = std::net::UdpSocket::bind(addr)?; - /// std_sock.set_nonblocking(true)?; - /// let sock = UdpSocket::from_std(std_sock)?; - /// // use `sock` - /// # Ok(()) - /// # } - /// ``` - pub fn from_std(socket: net::UdpSocket) -> io::Result<UdpSocket> { - let io = mio::net::UdpSocket::from_std(socket); - UdpSocket::new(io) - } - - /// Returns the local address that this socket is bound to. - /// - /// # Example - /// - /// ```no_run - /// use tokio::net::UdpSocket; - /// # use std::{io, net::SocketAddr}; - /// - /// # #[tokio::main] - /// # async fn main() -> io::Result<()> { - /// let addr = "0.0.0.0:8080".parse::<SocketAddr>().unwrap(); - /// let sock = UdpSocket::bind(addr).await?; - /// // the address the socket is bound to - /// let local_addr = sock.local_addr()?; - /// # Ok(()) - /// # } - /// ``` - pub fn local_addr(&self) -> io::Result<SocketAddr> { - self.io.local_addr() - } - - /// Connects the UDP socket setting the default destination for send() and - /// limiting packets that are read via recv from the address specified in - /// `addr`. - /// - /// # Example - /// - /// ```no_run - /// use tokio::net::UdpSocket; - /// # use std::{io, net::SocketAddr}; - /// - /// # #[tokio::main] - /// # async fn main() -> io::Result<()> { - /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?; - /// - /// let remote_addr = "127.0.0.1:59600".parse::<SocketAddr>().unwrap(); - /// sock.connect(remote_addr).await?; - /// let mut buf = [0u8; 32]; - /// // recv from remote_addr - /// let len = sock.recv(&mut buf).await?; - /// // send to remote_addr - /// let _len = sock.send(&buf[..len]).await?; - /// # Ok(()) - /// # } - /// ``` - pub async fn connect<A: ToSocketAddrs>(&self, addr: A) -> io::Result<()> { - let addrs = to_socket_addrs(addr).await?; - let mut last_err = None; - - for addr in addrs { - match self.io.connect(addr) { - Ok(_) => return Ok(()), - Err(e) => last_err = Some(e), - } - } - - Err(last_err.unwrap_or_else(|| { - io::Error::new( - io::ErrorKind::InvalidInput, - "could not resolve to any address", - ) - })) - } - - /// Wait for any of the requested ready states. - /// - /// This function is usually paired with `try_recv()` or `try_send()`. It - /// can be used to concurrently recv / send to the same socket on a single - /// task without splitting the socket. - /// - /// The function may complete without the socket being ready. This is a - /// false-positive and attempting an operation will return with - /// `io::ErrorKind::WouldBlock`. - /// - /// # Examples - /// - /// Concurrently receive from and send to the socket on the same task - /// without splitting. - /// - /// ```no_run - /// use tokio::io::{self, Interest}; - /// use tokio::net::UdpSocket; - /// - /// #[tokio::main] - /// async fn main() -> io::Result<()> { - /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; - /// socket.connect("127.0.0.1:8081").await?; - /// - /// loop { - /// let ready = socket.ready(Interest::READABLE | Interest::WRITABLE).await?; - /// - /// if ready.is_readable() { - /// // The buffer is **not** included in the async task and will only exist - /// // on the stack. - /// let mut data = [0; 1024]; - /// match socket.try_recv(&mut data[..]) { - /// Ok(n) => { - /// println!("received {:?}", &data[..n]); - /// } - /// // False-positive, continue - /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} - /// Err(e) => { - /// return Err(e); - /// } - /// } - /// } - /// - /// if ready.is_writable() { - /// // Write some data - /// match socket.try_send(b"hello world") { - /// Ok(n) => { - /// println!("sent {} bytes", n); - /// } - /// // False-positive, continue - /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} - /// Err(e) => { - /// return Err(e); - /// } - /// } - /// } - /// } - /// } - /// ``` - pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { - let event = self.io.registration().readiness(interest).await?; - Ok(event.ready) - } - - /// Wait for the socket to become writable. - /// - /// This function is equivalent to `ready(Interest::WRITABLE)` and is - /// usually paired with `try_send()` or `try_send_to()`. - /// - /// The function may complete without the socket being writable. This is a - /// false-positive and attempting a `try_send()` will return with - /// `io::ErrorKind::WouldBlock`. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::UdpSocket; - /// use std::io; - /// - /// #[tokio::main] - /// async fn main() -> io::Result<()> { - /// // Bind socket - /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; - /// socket.connect("127.0.0.1:8081").await?; - /// - /// loop { - /// // Wait for the socket to be writable - /// socket.writable().await?; - /// - /// // Try to send data, this may still fail with `WouldBlock` - /// // if the readiness event is a false positive. - /// match socket.try_send(b"hello world") { - /// Ok(n) => { - /// break; - /// } - /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - /// continue; - /// } - /// Err(e) => { - /// return Err(e); - /// } - /// } - /// } - /// - /// Ok(()) - /// } - /// ``` - pub async fn writable(&self) -> io::Result<()> { - self.ready(Interest::WRITABLE).await?; - Ok(()) - } - - /// Sends data on the socket to the remote address that the socket is - /// connected to. - /// - /// The [`connect`] method will connect this socket to a remote address. - /// This method will fail if the socket is not connected. - /// - /// [`connect`]: method@Self::connect - /// - /// # Return - /// - /// On success, the number of bytes sent is returned, otherwise, the - /// encountered error is returned. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::io; - /// use tokio::net::UdpSocket; - /// - /// #[tokio::main] - /// async fn main() -> io::Result<()> { - /// // Bind socket - /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; - /// socket.connect("127.0.0.1:8081").await?; - /// - /// // Send a message - /// socket.send(b"hello world").await?; - /// - /// Ok(()) - /// } - /// ``` - pub async fn send(&self, buf: &[u8]) -> io::Result<usize> { - self.io - .registration() - .async_io(Interest::WRITABLE, || self.io.send(buf)) - .await - } - - /// Attempts to send data on the socket to the remote address to which it - /// was previously `connect`ed. - /// - /// The [`connect`] method will connect this socket to a remote address. - /// This method will fail if the socket is not connected. - /// - /// Note that on multiple calls to a `poll_*` method in the send direction, - /// only the `Waker` from the `Context` passed to the most recent call will - /// be scheduled to receive a wakeup. - /// - /// # Return value - /// - /// The function returns: - /// - /// * `Poll::Pending` if the socket is not available to write - /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent - /// * `Poll::Ready(Err(e))` if an error is encountered. - /// - /// # Errors - /// - /// This function may encounter any standard I/O error except `WouldBlock`. - /// - /// [`connect`]: method@Self::connect - pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> { - self.io - .registration() - .poll_write_io(cx, || self.io.send(buf)) - } - - /// Try to send data on the socket to the remote address to which it is - /// connected. - /// - /// When the socket buffer is full, `Err(io::ErrorKind::WouldBlock)` is - /// returned. This function is usually paired with `writable()`. - /// - /// # Returns - /// - /// If successful, `Ok(n)` is returned, where `n` is the number of bytes - /// sent. If the socket is not ready to send data, - /// `Err(ErrorKind::WouldBlock)` is returned. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::UdpSocket; - /// use std::io; - /// - /// #[tokio::main] - /// async fn main() -> io::Result<()> { - /// // Bind a UDP socket - /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; - /// - /// // Connect to a peer - /// socket.connect("127.0.0.1:8081").await?; - /// - /// loop { - /// // Wait for the socket to be writable - /// socket.writable().await?; - /// - /// // Try to send data, this may still fail with `WouldBlock` - /// // if the readiness event is a false positive. - /// match socket.try_send(b"hello world") { - /// Ok(n) => { - /// break; - /// } - /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - /// continue; - /// } - /// Err(e) => { - /// return Err(e); - /// } - /// } - /// } - /// - /// Ok(()) - /// } - /// ``` - pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> { - self.io - .registration() - .try_io(Interest::WRITABLE, || self.io.send(buf)) - } - - /// Wait for the socket to become readable. - /// - /// This function is equivalent to `ready(Interest::READABLE)` and is usually - /// paired with `try_recv()`. - /// - /// The function may complete without the socket being readable. This is a - /// false-positive and attempting a `try_recv()` will return with - /// `io::ErrorKind::WouldBlock`. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::UdpSocket; - /// use std::io; - /// - /// #[tokio::main] - /// async fn main() -> io::Result<()> { - /// // Connect to a peer - /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; - /// socket.connect("127.0.0.1:8081").await?; - /// - /// loop { - /// // Wait for the socket to be readable - /// socket.readable().await?; - /// - /// // The buffer is **not** included in the async task and will - /// // only exist on the stack. - /// let mut buf = [0; 1024]; - /// - /// // Try to recv data, this may still fail with `WouldBlock` - /// // if the readiness event is a false positive. - /// match socket.try_recv(&mut buf) { - /// Ok(n) => { - /// println!("GOT {:?}", &buf[..n]); - /// break; - /// } - /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - /// continue; - /// } - /// Err(e) => { - /// return Err(e); - /// } - /// } - /// } - /// - /// Ok(()) - /// } - /// ``` - pub async fn readable(&self) -> io::Result<()> { - self.ready(Interest::READABLE).await?; - Ok(()) - } - - /// Receives a single datagram message on the socket from the remote address - /// to which it is connected. On success, returns the number of bytes read. - /// - /// The function must be called with valid byte array `buf` of sufficient - /// size to hold the message bytes. If a message is too long to fit in the - /// supplied buffer, excess bytes may be discarded. - /// - /// The [`connect`] method will connect this socket to a remote address. - /// This method will fail if the socket is not connected. - /// - /// [`connect`]: method@Self::connect - /// - /// ```no_run - /// use tokio::net::UdpSocket; - /// use std::io; - /// - /// #[tokio::main] - /// async fn main() -> io::Result<()> { - /// // Bind socket - /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; - /// socket.connect("127.0.0.1:8081").await?; - /// - /// let mut buf = vec![0; 10]; - /// let n = socket.recv(&mut buf).await?; - /// - /// println!("received {} bytes {:?}", n, &buf[..n]); - /// - /// Ok(()) - /// } - /// ``` - pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> { - self.io - .registration() - .async_io(Interest::READABLE, || self.io.recv(buf)) - .await - } - - /// Attempts to receive a single datagram message on the socket from the remote - /// address to which it is `connect`ed. - /// - /// The [`connect`] method will connect this socket to a remote address. This method - /// resolves to an error if the socket is not connected. - /// - /// Note that on multiple calls to a `poll_*` method in the recv direction, only the - /// `Waker` from the `Context` passed to the most recent call will be scheduled to - /// receive a wakeup. - /// - /// # Return value - /// - /// The function returns: - /// - /// * `Poll::Pending` if the socket is not ready to read - /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready - /// * `Poll::Ready(Err(e))` if an error is encountered. - /// - /// # Errors - /// - /// This function may encounter any standard I/O error except `WouldBlock`. - /// - /// [`connect`]: method@Self::connect - pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> { - let n = ready!(self.io.registration().poll_read_io(cx, || { - // Safety: will not read the maybe uinitialized bytes. - let b = unsafe { - &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) - }; - - self.io.recv(b) - }))?; - - // Safety: We trust `recv` to have filled up `n` bytes in the buffer. - unsafe { - buf.assume_init(n); - } - buf.advance(n); - Poll::Ready(Ok(())) - } - - /// Try to receive a single datagram message on the socket from the remote - /// address to which it is connected. On success, returns the number of - /// bytes read. - /// - /// The function must be called with valid byte array buf of sufficient size - /// to hold the message bytes. If a message is too long to fit in the - /// supplied buffer, excess bytes may be discarded. - /// - /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is - /// returned. This function is usually paired with `readable()`. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::UdpSocket; - /// use std::io; - /// - /// #[tokio::main] - /// async fn main() -> io::Result<()> { - /// // Connect to a peer - /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; - /// socket.connect("127.0.0.1:8081").await?; - /// - /// loop { - /// // Wait for the socket to be readable - /// socket.readable().await?; - /// - /// // The buffer is **not** included in the async task and will - /// // only exist on the stack. - /// let mut buf = [0; 1024]; - /// - /// // Try to recv data, this may still fail with `WouldBlock` - /// // if the readiness event is a false positive. - /// match socket.try_recv(&mut buf) { - /// Ok(n) => { - /// println!("GOT {:?}", &buf[..n]); - /// break; - /// } - /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - /// continue; - /// } - /// Err(e) => { - /// return Err(e); - /// } - /// } - /// } - /// - /// Ok(()) - /// } - /// ``` - pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> { - self.io - .registration() - .try_io(Interest::READABLE, || self.io.recv(buf)) - } - - /// Sends data on the socket to the given address. On success, returns the - /// number of bytes written. - /// - /// Address type can be any implementor of [`ToSocketAddrs`] trait. See its - /// documentation for concrete examples. - /// - /// It is possible for `addr` to yield multiple addresses, but `send_to` - /// will only send data to the first address yielded by `addr`. - /// - /// This will return an error when the IP version of the local socket does - /// not match that returned from [`ToSocketAddrs`]. - /// - /// [`ToSocketAddrs`]: crate::net::ToSocketAddrs - /// - /// # Example - /// - /// ```no_run - /// use tokio::net::UdpSocket; - /// use std::io; - /// - /// #[tokio::main] - /// async fn main() -> io::Result<()> { - /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; - /// let len = socket.send_to(b"hello world", "127.0.0.1:8081").await?; - /// - /// println!("Sent {} bytes", len); - /// - /// Ok(()) - /// } - /// ``` - pub async fn send_to<A: ToSocketAddrs>(&self, buf: &[u8], target: A) -> io::Result<usize> { - let mut addrs = to_socket_addrs(target).await?; - - match addrs.next() { - Some(target) => self.send_to_addr(buf, target).await, - None => Err(io::Error::new( - io::ErrorKind::InvalidInput, - "no addresses to send data to", - )), - } - } - - /// Attempts to send data on the socket to a given address. - /// - /// Note that on multiple calls to a `poll_*` method in the send direction, only the - /// `Waker` from the `Context` passed to the most recent call will be scheduled to - /// receive a wakeup. - /// - /// # Return value - /// - /// The function returns: - /// - /// * `Poll::Pending` if the socket is not ready to write - /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent. - /// * `Poll::Ready(Err(e))` if an error is encountered. - /// - /// # Errors - /// - /// This function may encounter any standard I/O error except `WouldBlock`. - pub fn poll_send_to( - &self, - cx: &mut Context<'_>, - buf: &[u8], - target: SocketAddr, - ) -> Poll<io::Result<usize>> { - self.io - .registration() - .poll_write_io(cx, || self.io.send_to(buf, target)) - } - - /// Try to send data on the socket to the given address, but if the send is - /// blocked this will return right away. - /// - /// This function is usually paired with `writable()`. - /// - /// # Returns - /// - /// If successfull, returns the number of bytes sent - /// - /// Users should ensure that when the remote cannot receive, the - /// [`ErrorKind::WouldBlock`] is properly handled. An error can also occur - /// if the IP version of the socket does not match that of `target`. - /// - /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock - /// - /// # Example - /// - /// ```no_run - /// use tokio::net::UdpSocket; - /// use std::error::Error; - /// use std::io; - /// - /// #[tokio::main] - /// async fn main() -> Result<(), Box<dyn Error>> { - /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; - /// - /// let dst = "127.0.0.1:8081".parse()?; - /// - /// loop { - /// socket.writable().await?; - /// - /// match socket.try_send_to(&b"hello world"[..], dst) { - /// Ok(sent) => { - /// println!("sent {} bytes", sent); - /// break; - /// } - /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - /// // Writable false positive. - /// continue; - /// } - /// Err(e) => return Err(e.into()), - /// } - /// } - /// - /// Ok(()) - /// } - /// ``` - pub fn try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> { - self.io - .registration() - .try_io(Interest::WRITABLE, || self.io.send_to(buf, target)) - } - - async fn send_to_addr(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> { - self.io - .registration() - .async_io(Interest::WRITABLE, || self.io.send_to(buf, target)) - .await - } - - /// Receives a single datagram message on the socket. On success, returns - /// the number of bytes read and the origin. - /// - /// The function must be called with valid byte array `buf` of sufficient - /// size to hold the message bytes. If a message is too long to fit in the - /// supplied buffer, excess bytes may be discarded. - /// - /// # Example - /// - /// ```no_run - /// use tokio::net::UdpSocket; - /// use std::io; - /// - /// #[tokio::main] - /// async fn main() -> io::Result<()> { - /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; - /// - /// let mut buf = vec![0u8; 32]; - /// let (len, addr) = socket.recv_from(&mut buf).await?; - /// - /// println!("received {:?} bytes from {:?}", len, addr); - /// - /// Ok(()) - /// } - /// ``` - pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - self.io - .registration() - .async_io(Interest::READABLE, || self.io.recv_from(buf)) - .await - } - - /// Attempts to receive a single datagram on the socket. - /// - /// Note that on multiple calls to a `poll_*` method in the recv direction, only the - /// `Waker` from the `Context` passed to the most recent call will be scheduled to - /// receive a wakeup. - /// - /// # Return value - /// - /// The function returns: - /// - /// * `Poll::Pending` if the socket is not ready to read - /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready - /// * `Poll::Ready(Err(e))` if an error is encountered. - /// - /// # Errors - /// - /// This function may encounter any standard I/O error except `WouldBlock`. - pub fn poll_recv_from( - &self, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll<io::Result<SocketAddr>> { - let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || { - // Safety: will not read the maybe uinitialized bytes. - let b = unsafe { - &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) - }; - - self.io.recv_from(b) - }))?; - - // Safety: We trust `recv` to have filled up `n` bytes in the buffer. - unsafe { - buf.assume_init(n); - } - buf.advance(n); - Poll::Ready(Ok(addr)) - } - - /// Try to receive a single datagram message on the socket. On success, - /// returns the number of bytes read and the origin. - /// - /// The function must be called with valid byte array buf of sufficient size - /// to hold the message bytes. If a message is too long to fit in the - /// supplied buffer, excess bytes may be discarded. - /// - /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is - /// returned. This function is usually paired with `readable()`. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::UdpSocket; - /// use std::io; - /// - /// #[tokio::main] - /// async fn main() -> io::Result<()> { - /// // Connect to a peer - /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; - /// socket.connect("127.0.0.1:8081").await?; - /// - /// loop { - /// // Wait for the socket to be readable - /// socket.readable().await?; - /// - /// // The buffer is **not** included in the async task and will - /// // only exist on the stack. - /// let mut buf = [0; 1024]; - /// - /// // Try to recv data, this may still fail with `WouldBlock` - /// // if the readiness event is a false positive. - /// match socket.try_recv(&mut buf) { - /// Ok(n) => { - /// println!("GOT {:?}", &buf[..n]); - /// break; - /// } - /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - /// continue; - /// } - /// Err(e) => { - /// return Err(e); - /// } - /// } - /// } - /// - /// Ok(()) - /// } - /// ``` - pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - self.io - .registration() - .try_io(Interest::READABLE, || self.io.recv_from(buf)) - } - - /// Receives data from the socket, without removing it from the input queue. - /// On success, returns the number of bytes read and the address from whence - /// the data came. - /// - /// # Notes - /// - /// On Windows, if the data is larger than the buffer specified, the buffer - /// is filled with the first part of the data, and peek_from returns the error - /// WSAEMSGSIZE(10040). The excess data is lost. - /// Make sure to always use a sufficiently large buffer to hold the - /// maximum UDP packet size, which can be up to 65536 bytes in size. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::UdpSocket; - /// use std::io; - /// - /// #[tokio::main] - /// async fn main() -> io::Result<()> { - /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; - /// - /// let mut buf = vec![0u8; 32]; - /// let (len, addr) = socket.peek_from(&mut buf).await?; - /// - /// println!("peeked {:?} bytes from {:?}", len, addr); - /// - /// Ok(()) - /// } - /// ``` - pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - self.io - .registration() - .async_io(Interest::READABLE, || self.io.peek_from(buf)) - .await - } - - /// Receives data from the socket, without removing it from the input queue. - /// On success, returns the number of bytes read. - /// - /// # Notes - /// - /// Note that on multiple calls to a `poll_*` method in the recv direction, only the - /// `Waker` from the `Context` passed to the most recent call will be scheduled to - /// receive a wakeup - /// - /// On Windows, if the data is larger than the buffer specified, the buffer - /// is filled with the first part of the data, and peek returns the error - /// WSAEMSGSIZE(10040). The excess data is lost. - /// Make sure to always use a sufficiently large buffer to hold the - /// maximum UDP packet size, which can be up to 65536 bytes in size. - /// - /// # Return value - /// - /// The function returns: - /// - /// * `Poll::Pending` if the socket is not ready to read - /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready - /// * `Poll::Ready(Err(e))` if an error is encountered. - /// - /// # Errors - /// - /// This function may encounter any standard I/O error except `WouldBlock`. - pub fn poll_peek_from( - &self, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, < |