use crate::future::poll_fn; use crate::io::PollEvented; use crate::net::udp::split::{split, RecvHalf, SendHalf}; use crate::net::ToSocketAddrs; use std::convert::TryFrom; use std::fmt; use std::io; use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::task::{Context, Poll}; cfg_udp! { /// A UDP socket pub struct UdpSocket { io: PollEvented, } } impl UdpSocket { /// This function will create a new UDP socket and attempt to bind it to /// the `addr` provided. pub async fn bind(addr: A) -> io::Result { let addrs = addr.to_socket_addrs().await?; let mut last_err = None; for addr in addrs { match UdpSocket::bind_addr(addr) { Ok(socket) => return Ok(socket), Err(e) => last_err = Some(e), } } Err(last_err.unwrap_or_else(|| { io::Error::new( io::ErrorKind::InvalidInput, "could not resolve to any address", ) })) } fn bind_addr(addr: SocketAddr) -> io::Result { let sys = mio::net::UdpSocket::bind(&addr)?; UdpSocket::new(sys) } fn new(socket: mio::net::UdpSocket) -> io::Result { let io = PollEvented::new(socket)?; Ok(UdpSocket { io }) } /// Creates a new `UdpSocket` from the previously bound socket provided. /// /// The socket given will be registered with the event loop that `handle` /// is associated with. This function requires that `socket` has previously /// been bound to an address to work correctly. /// /// This can be used in conjunction with net2's `UdpBuilder` interface to /// configure a socket before it's handed off, such as setting options like /// `reuse_address` or binding to multiple addresses. /// /// # Panics /// /// This function panics if thread-local runtime is not set. /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. pub fn from_std(socket: net::UdpSocket) -> io::Result { let io = mio::net::UdpSocket::from_socket(socket)?; let io = PollEvented::new(io)?; Ok(UdpSocket { io }) } /// Splits the `UdpSocket` into a receive half and a send half. The two parts /// can be used to receive and send datagrams concurrently, even from two /// different tasks. pub fn split(self) -> (RecvHalf, SendHalf) { split(self) } /// Returns the local address that this socket is bound to. pub fn local_addr(&self) -> io::Result { self.io.get_ref().local_addr() } /// Connects the UDP socket setting the default destination for send() and /// limiting packets that are read via recv from the address specified in /// `addr`. pub async fn connect(&self, addr: A) -> io::Result<()> { let addrs = addr.to_socket_addrs().await?; let mut last_err = None; for addr in addrs { match self.io.get_ref().connect(addr) { Ok(_) => return Ok(()), Err(e) => last_err = Some(e), } } Err(last_err.unwrap_or_else(|| { io::Error::new( io::ErrorKind::InvalidInput, "could not resolve to any address", ) })) } /// Returns a future that sends data on the socket to the remote address to which it is connected. /// On success, the future will resolve to the number of bytes written. /// /// The [`connect`] method will connect this socket to a remote address. The future /// will resolve to an error if the socket is not connected. /// /// [`connect`]: method@Self::connect pub async fn send(&mut self, buf: &[u8]) -> io::Result { poll_fn(|cx| self.poll_send(cx, buf)).await } /// Try to send data on the socket to the remote address to which it is /// connected. /// /// # Returns /// /// If successfull, the number of bytes sent is returned. Users /// should ensure that when the remote cannot receive, the /// [`ErrorKind::WouldBlock`] is properly handled. /// /// [`ErrorKind::WouldBlock`]: std::io::error::ErrorKind::WouldBlock pub fn try_send(&self, buf: &[u8]) -> io::Result { self.io.get_ref().send(buf) } // Poll IO functions that takes `&self` are provided for the split API. // // They are not public because (taken from the doc of `PollEvented`): // // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the // caller must ensure that there are at most two tasks that use a // `PollEvented` instance concurrently. One for reading and one for writing. // While violating this requirement is "safe" from a Rust memory model point // of view, it will result in unexpected behavior in the form of lost // notifications and tasks hanging. #[doc(hidden)] pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { ready!(self.io.poll_write_ready(cx))?; match self.io.get_ref().send(buf) { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { self.io.clear_write_ready(cx)?; Poll::Pending } x => Poll::Ready(x), } } /// Returns a future that receives a single datagram message on the socket from /// the remote address to which it is connected. On success, the future will resolve /// to the number of bytes read. /// /// The function must be called with valid byte array `buf` of sufficient size to /// hold the message bytes. If a message is too long to fit in the supplied buffer, /// excess bytes may be discarded. /// /// The [`connect`] method will connect this socket to a remote address. The future /// will fail if the socket is not connected. /// /// [`connect`]: method@Self::connect pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result { poll_fn(|cx| self.poll_recv(cx, buf)).await } #[doc(hidden)] pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; match self.io.get_ref().recv(buf) { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { self.io.clear_read_ready(cx, mio::Ready::readable())?; Poll::Pending } x => Poll::Ready(x), } } /// Returns a future that sends data on the socket to the given address. /// On success, the future will resolve to the number of bytes written. /// /// The future will resolve to an error if the IP version of the socket does /// not match that of `target`. pub async fn send_to(&mut self, buf: &[u8], target: A) -> io::Result { let mut addrs = target.to_socket_addrs().await?; match addrs.next() { Some(target) => poll_fn(|cx| self.poll_send_to(cx, buf, &target)).await, None => Err(io::Error::new( io::ErrorKind::InvalidInput, "no addresses to send data to", )), } } /// Try to send data on the socket to the given address. /// /// # Returns /// /// If successfull, the future resolves to the number of bytes sent. /// /// Users should ensure that when the remote cannot receive, the /// [`ErrorKind::WouldBlock`] is properly handled. An error can also occur /// if the IP version of the socket does not match that of `target`. /// /// [`ErrorKind::WouldBlock`]: std::io::error::ErrorKind::WouldBlock pub fn try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result { self.io.get_ref().send_to(buf, &target) } // TODO: Public or not? #[doc(hidden)] pub fn poll_send_to( &self, cx: &mut Context<'_>, buf: &[u8], target: &SocketAddr, ) -> Poll> { ready!(self.io.poll_write_ready(cx))?; match self.io.get_ref().send_to(buf, target) { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { self.io.clear_write_ready(cx)?; Poll::Pending } x => Poll::Ready(x), } } /// Returns a future that receives a single datagram on the socket. On success, /// the future resolves to the number of bytes read and the origin. /// /// The function must be called with valid byte array `buf` of sufficient size /// to hold the message bytes. If a message is too long to fit in the supplied /// buffer, excess bytes may be discarded. pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { poll_fn(|cx| self.poll_recv_from(cx, buf)).await } #[doc(hidden)] pub fn poll_recv_from( &self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; match self.io.get_ref().recv_from(buf) { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { self.io.clear_read_ready(cx, mio::Ready::readable())?; Poll::Pending } x => Poll::Ready(x), } } /// Gets the value of the `SO_BROADCAST` option for this socket. /// /// For more information about this option, see [`set_broadcast`]. /// /// [`set_broadcast`]: method@Self::set_broadcast pub fn broadcast(&self) -> io::Result { self.io.get_ref().broadcast() } /// Sets the value of the `SO_BROADCAST` option for this socket. /// /// When enabled, this socket is allowed to send packets to a broadcast /// address. pub fn set_broadcast(&self, on: bool) -> io::Result<()> { self.io.get_ref().set_broadcast(on) } /// Gets the value of the `IP_MULTICAST_LOOP` option for this socket. /// /// For more information about this option, see [`set_multicast_loop_v4`]. /// /// [`set_multicast_loop_v4`]: method@Self::set_multicast_loop_v4 pub fn multicast_loop_v4(&self) -> io::Result { self.io.get_ref().multicast_loop_v4() } /// Sets the value of the `IP_MULTICAST_LOOP` option for this socket. /// /// If enabled, multicast packets will be looped back to the local socket. /// /// # Note /// /// This may not have any affect on IPv6 sockets. pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> { self.io.get_ref().set_multicast_loop_v4(on) } /// Gets the value of the `IP_MULTICAST_TTL` option for this socket. /// /// For more information about this option, see [`set_multicast_ttl_v4`]. /// /// [`set_multicast_ttl_v4`]: method@Self::set_multicast_ttl_v4 pub fn multicast_ttl_v4(&self) -> io::Result { self.io.get_ref().multicast_ttl_v4() } /// Sets the value of the `IP_MULTICAST_TTL` option for this socket. /// /// Indicates the time-to-live value of outgoing multicast packets for /// this socket. The default value is 1 which means that multicast packets /// don't leave the local network unless explicitly requested. /// /// # Note /// /// This may not have any affect on IPv6 sockets. pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> { self.io.get_ref().set_multicast_ttl_v4(ttl) } /// Gets the value of the `IPV6_MULTICAST_LOOP` option for this socket. /// /// For more information about this option, see [`set_multicast_loop_v6`]. /// /// [`set_multicast_loop_v6`]: method@Self::set_multicast_loop_v6 pub fn multicast_loop_v6(&self) -> io::Result { self.io.get_ref().multicast_loop_v6() } /// Sets the value of the `IPV6_MULTICAST_LOOP` option for this socket. /// /// Controls whether this socket sees the multicast packets it sends itself. /// /// # Note /// /// This may not have any affect on IPv4 sockets. pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> { self.io.get_ref().set_multicast_loop_v6(on) } /// Gets the value of the `IP_TTL` option for this socket. /// /// For more information about this option, see [`set_ttl`]. /// /// [`set_ttl`]: method@Self::set_ttl pub fn ttl(&self) -> io::Result { self.io.get_ref().ttl() } /// Sets the value for the `IP_TTL` option on this socket. /// /// This value sets the time-to-live field that is used in every packet sent /// from this socket. pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { self.io.get_ref().set_ttl(ttl) } /// Executes an operation of the `IP_ADD_MEMBERSHIP` type. /// /// This function specifies a new multicast group for this socket to join. /// The address must be a valid multicast address, and `interface` is the /// address of the local interface with which the system should join the /// multicast group. If it's equal to `INADDR_ANY` then an appropriate /// interface is chosen by the system. pub fn join_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> { self.io.get_ref().join_multicast_v4(&multiaddr, &interface) } /// Executes an operation of the `IPV6_ADD_MEMBERSHIP` type. /// /// This function specifies a new multicast group for this socket to join. /// The address must be a valid multicast address, and `interface` is the /// index of the interface to join/leave (or 0 to indicate any interface). pub fn join_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { self.io.get_ref().join_multicast_v6(multiaddr, interface) } /// Executes an operation of the `IP_DROP_MEMBERSHIP` type. /// /// For more information about this option, see [`join_multicast_v4`]. /// /// [`join_multicast_v4`]: method@Self::join_multicast_v4 pub fn leave_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> { self.io.get_ref().leave_multicast_v4(&multiaddr, &interface) } /// Executes an operation of the `IPV6_DROP_MEMBERSHIP` type. /// /// For more information about this option, see [`join_multicast_v6`]. /// /// [`join_multicast_v6`]: method@Self::join_multicast_v6 pub fn leave_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { self.io.get_ref().leave_multicast_v6(multiaddr, interface) } } impl TryFrom for mio::net::UdpSocket { type Error = io::Error; /// Consumes value, returning the mio I/O object. /// /// See [`PollEvented::into_inner`] for more details about /// resource deregistration that happens during the call. /// /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner fn try_from(value: UdpSocket) -> Result { value.io.into_inner() } } impl TryFrom for UdpSocket { type Error = io::Error; /// Consumes stream, returning the tokio I/O object. /// /// This is equivalent to /// [`UdpSocket::from_std(stream)`](UdpSocket::from_std). fn try_from(stream: net::UdpSocket) -> Result { Self::from_std(stream) } } impl fmt::Debug for UdpSocket { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.io.get_ref().fmt(f) } } #[cfg(all(unix))] mod sys { use super::UdpSocket; use std::os::unix::prelude::*; impl AsRawFd for UdpSocket { fn as_raw_fd(&self) -> RawFd { self.io.get_ref().as_raw_fd() } } } #[cfg(windows)] mod sys { // TODO: let's land these upstream with mio and then we can add them here. // // use std::os::windows::prelude::*; // use super::UdpSocket; // // impl AsRawHandle for UdpSocket { // fn as_raw_handle(&self) -> RawHandle { // self.io.get_ref().as_raw_handle() // } // } }