diff options
author | Sean McArthur <sean@seanmonstar.com> | 2020-09-23 13:02:15 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-23 13:02:15 -0700 |
commit | a0557840eb424e174bf81a0175c40f9e176a2cc2 (patch) | |
tree | 676d33bf4144f0c0aac5af9f826ecc216a1d50e2 /tokio/src/net/unix/datagram/socket.rs | |
parent | f25f12d57638a2928b3f738b3b1392d8773e276e (diff) |
io: use intrusive wait list for I/O driver (#2828)
This refactors I/O registration in a few ways:
- Cleans up the cached readiness in `PollEvented`. This cache used to
be helpful when readiness was a linked list of `*mut Node`s in
`Registration`. Previous refactors have turned `Registration` into just
an `AtomicUsize` holding the current readiness, so the cache is just
extra work and complexity. Gone.
- Polling the `Registration` for readiness now gives a `ReadyEvent`,
which includes the driver tick. This event must be passed back into
`clear_readiness`, so that the readiness is only cleared from `Registration`
if the tick hasn't changed. Previously, it was possible to clear the
readiness even though another thread had *just* polled the driver and
found the socket ready again.
- Registration now also contains an `async fn readiness`, which stores
wakers in an instrusive linked list. This allows an unbounded number
of tasks to register for readiness (previously, only 1 per direction (read
and write)). By using the intrusive linked list, there is no concern of
leaking the storage of the wakers, since they are stored inside the `async fn`
and released when the future is dropped.
- Registration retains a `poll_readiness(Direction)` method, to support
`AsyncRead` and `AsyncWrite`. They aren't able to use `async fn`s, and
so there are 2 reserved slots for those methods.
- IO types where it makes sense to have multiple tasks waiting on them
now take advantage of this new `async fn readiness`, such as `UdpSocket`
and `UnixDatagram`.
Additionally, this makes the `io-driver` "feature" internal-only (no longer
documented, not part of public API), and adds a second internal-only
feature, `io-readiness`, to group together linked list part of registration
that is only used by some of the IO types.
After a bit of discussion, changing stream-based transports (like
`TcpStream`) to have `async fn read(&self)` is punted, since that
is likely too easy of a footgun to activate.
Refs: #2779, #2728
Diffstat (limited to 'tokio/src/net/unix/datagram/socket.rs')
-rw-r--r-- | tokio/src/net/unix/datagram/socket.rs | 227 |
1 files changed, 34 insertions, 193 deletions
diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs index ba3a10c4..78baf279 100644 --- a/tokio/src/net/unix/datagram/socket.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -1,7 +1,4 @@ -use crate::future::poll_fn; use crate::io::PollEvented; -use crate::net::unix::datagram::split::{split, RecvHalf, SendHalf}; -use crate::net::unix::datagram::split_owned::{split_owned, OwnedRecvHalf, OwnedSendHalf}; use std::convert::TryFrom; use std::fmt; @@ -10,7 +7,6 @@ use std::net::Shutdown; use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::net::{self, SocketAddr}; use std::path::Path; -use std::task::{Context, Poll}; cfg_uds! { /// An I/O object representing a Unix datagram socket. @@ -38,9 +34,9 @@ cfg_uds! { /// /// // Bind each socket to a filesystem path /// let tx_path = tmp.path().join("tx"); - /// let mut tx = UnixDatagram::bind(&tx_path)?; + /// let tx = UnixDatagram::bind(&tx_path)?; /// let rx_path = tmp.path().join("rx"); - /// let mut rx = UnixDatagram::bind(&rx_path)?; + /// let rx = UnixDatagram::bind(&rx_path)?; /// /// let bytes = b"hello world"; /// tx.send_to(bytes, &rx_path).await?; @@ -64,7 +60,7 @@ cfg_uds! { /// use tokio::net::UnixDatagram; /// /// // Create the pair of sockets - /// let (mut sock1, mut sock2) = UnixDatagram::pair()?; + /// let (sock1, sock2) = UnixDatagram::pair()?; /// /// // Since the sockets are paired, the paired send/recv /// // functions can be used @@ -128,7 +124,7 @@ impl UnixDatagram { /// use tokio::net::UnixDatagram; /// /// // Create the pair of sockets - /// let (mut sock1, mut sock2) = UnixDatagram::pair()?; + /// let (sock1, sock2) = UnixDatagram::pair()?; /// /// // Since the sockets are paired, the paired send/recv /// // functions can be used @@ -208,12 +204,12 @@ impl UnixDatagram { /// use tempfile::tempdir; /// /// // Create an unbound socket - /// let mut tx = UnixDatagram::unbound()?; + /// let tx = UnixDatagram::unbound()?; /// /// // Create another, bound socket /// let tmp = tempdir()?; /// let rx_path = tmp.path().join("rx"); - /// let mut rx = UnixDatagram::bind(&rx_path)?; + /// let rx = UnixDatagram::bind(&rx_path)?; /// /// // Send to the bound socket /// let bytes = b"hello world"; @@ -247,12 +243,12 @@ impl UnixDatagram { /// use tempfile::tempdir; /// /// // Create an unbound socket - /// let mut tx = UnixDatagram::unbound()?; + /// let tx = UnixDatagram::unbound()?; /// /// // Create another, bound socket /// let tmp = tempdir()?; /// let rx_path = tmp.path().join("rx"); - /// let mut rx = UnixDatagram::bind(&rx_path)?; + /// let rx = UnixDatagram::bind(&rx_path)?; /// /// // Connect to the bound socket /// tx.connect(&rx_path)?; @@ -284,7 +280,7 @@ impl UnixDatagram { /// use tokio::net::UnixDatagram; /// /// // Create the pair of sockets - /// let (mut sock1, mut sock2) = UnixDatagram::pair()?; + /// let (sock1, sock2) = UnixDatagram::pair()?; /// /// // Since the sockets are paired, the paired send/recv /// // functions can be used @@ -300,8 +296,10 @@ impl UnixDatagram { /// # Ok(()) /// # } /// ``` - pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { - poll_fn(|cx| self.poll_send_priv(cx, buf)).await + pub async fn send(&self, buf: &[u8]) -> io::Result<usize> { + self.io + .async_io(mio::Ready::writable(), |sock| sock.send(buf)) + .await } /// Try to send a datagram to the peer without waiting. @@ -371,32 +369,6 @@ impl UnixDatagram { self.io.get_ref().send_to(buf, target) } - // Poll IO functions that takes `&self` are provided for the split API. - // - // They are not public because (taken from the doc of `PollEvented`): - // - // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the - // caller must ensure that there are at most two tasks that use a - // `PollEvented` instance concurrently. One for reading and one for writing. - // While violating this requirement is "safe" from a Rust memory model point - // of view, it will result in unexpected behavior in the form of lost - // notifications and tasks hanging. - pub(crate) fn poll_send_priv( - &self, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending - } - x => Poll::Ready(x), - } - } - /// Receives data from the socket. /// /// # Examples @@ -407,7 +379,7 @@ impl UnixDatagram { /// use tokio::net::UnixDatagram; /// /// // Create the pair of sockets - /// let (mut sock1, mut sock2) = UnixDatagram::pair()?; + /// let (sock1, sock2) = UnixDatagram::pair()?; /// /// // Since the sockets are paired, the paired send/recv /// // functions can be used @@ -423,8 +395,10 @@ impl UnixDatagram { /// # Ok(()) /// # } /// ``` - pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { - poll_fn(|cx| self.poll_recv_priv(cx, buf)).await + pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> { + self.io + .async_io(mio::Ready::readable(), |sock| sock.recv(buf)) + .await } /// Try to receive a datagram from the peer without waiting. @@ -455,22 +429,6 @@ impl UnixDatagram { self.io.get_ref().recv(buf) } - pub(crate) fn poll_recv_priv( - &self, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - match self.io.get_ref().recv(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - x => Poll::Ready(x), - } - } - /// Sends data on the socket to the specified address. /// /// # Examples @@ -487,9 +445,9 @@ impl UnixDatagram { /// /// // Bind each socket to a filesystem path /// let tx_path = tmp.path().join("tx"); - /// let mut tx = UnixDatagram::bind(&tx_path)?; + /// let tx = UnixDatagram::bind(&tx_path)?; /// let rx_path = tmp.path().join("rx"); - /// let mut rx = UnixDatagram::bind(&rx_path)?; + /// let rx = UnixDatagram::bind(&rx_path)?; /// /// let bytes = b"hello world"; /// tx.send_to(bytes, &rx_path).await?; @@ -504,28 +462,15 @@ impl UnixDatagram { /// # Ok(()) /// # } /// ``` - pub async fn send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize> + pub async fn send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize> where - P: AsRef<Path> + Unpin, + P: AsRef<Path>, { - poll_fn(|cx| self.poll_send_to_priv(cx, buf, target.as_ref())).await - } - - pub(crate) fn poll_send_to_priv( - &self, - cx: &mut Context<'_>, - buf: &[u8], - target: &Path, - ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send_to(buf, target) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending - } - x => Poll::Ready(x), - } + self.io + .async_io(mio::Ready::writable(), |sock| { + sock.send_to(buf, target.as_ref()) + }) + .await } /// Receives data from the socket. @@ -544,9 +489,9 @@ impl UnixDatagram { /// /// // Bind each socket to a filesystem path /// let tx_path = tmp.path().join("tx"); - /// let mut tx = UnixDatagram::bind(&tx_path)?; + /// let tx = UnixDatagram::bind(&tx_path)?; /// let rx_path = tmp.path().join("rx"); - /// let mut rx = UnixDatagram::bind(&rx_path)?; + /// let rx = UnixDatagram::bind(&rx_path)?; /// /// let bytes = b"hello world"; /// tx.send_to(bytes, &rx_path).await?; @@ -561,8 +506,10 @@ impl UnixDatagram { /// # Ok(()) /// # } /// ``` - pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - poll_fn(|cx| self.poll_recv_from_priv(cx, buf)).await + pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.io + .async_io(mio::Ready::readable(), |sock| sock.recv_from(buf)) + .await } /// Try to receive data from the socket without waiting. @@ -601,22 +548,6 @@ impl UnixDatagram { self.io.get_ref().recv_from(buf) } - pub(crate) fn poll_recv_from_priv( - &self, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll<Result<(usize, SocketAddr), io::Error>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - match self.io.get_ref().recv_from(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - x => Poll::Ready(x), - } - } - /// Returns the local address that this socket is bound to. /// /// # Examples @@ -748,7 +679,7 @@ impl UnixDatagram { /// use std::net::Shutdown; /// /// // Create an unbound socket - /// let (mut socket, other) = UnixDatagram::pair()?; + /// let (socket, other) = UnixDatagram::pair()?; /// /// socket.shutdown(Shutdown::Both)?; /// @@ -768,102 +699,12 @@ impl UnixDatagram { pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { self.io.get_ref().shutdown(how) } - - // These lifetime markers also appear in the generated documentation, and make - // it more clear that this is a *borrowed* split. - #[allow(clippy::needless_lifetimes)] - /// Split a `UnixDatagram` into a receive half and a send half, which can be used - /// to receive and send the datagram concurrently. - /// - /// This method is more efficient than [`into_split`], but the halves cannot - /// be moved into independently spawned tasks. - /// - /// [`into_split`]: fn@crate::net::UnixDatagram::into_split - /// - /// # Examples - /// ``` - /// # use std::error::Error; - /// # #[tokio::main] - /// # async fn main() -> Result<(), Box<dyn Error>> { - /// use tokio::net::UnixDatagram; - /// - /// // Create the pair of sockets - /// let (mut sock1, mut sock2) = UnixDatagram::pair()?; - /// - /// // Split sock1 - /// let (sock1_rx, mut sock1_tx) = sock1.split(); - /// - /// // Since the sockets are paired, the paired send/recv - /// // functions can be used - /// let bytes = b"hello world"; - /// sock1_tx.send(bytes).await?; - /// - /// let mut buff = vec![0u8; 24]; - /// let size = sock2.recv(&mut buff).await?; - /// - /// let dgram = &buff[..size]; - /// assert_eq!(dgram, bytes); - /// - /// # Ok(()) - /// # } - /// ``` - pub fn split<'a>(&'a mut self) -> (RecvHalf<'a>, SendHalf<'a>) { - split(self) - } - - /// Split a `UnixDatagram` into a receive half and a send half, which can be used - /// to receive and send the datagram concurrently. - /// - /// Unlike [`split`], the owned halves can be moved to separate tasks, - /// however this comes at the cost of a heap allocation. - /// - /// **Note:** Dropping the write half will shut down the write half of the - /// datagram. This is equivalent to calling [`shutdown(Write)`]. - /// - /// # Examples - /// ``` - /// # use std::error::Error; - /// # #[tokio::main] - /// # async fn main() -> Result<(), Box<dyn Error>> { - /// use tokio::net::UnixDatagram; - /// - /// // Create the pair of sockets - /// let (sock1, mut sock2) = UnixDatagram::pair()?; - /// - /// // Split sock1 - /// let (sock1_rx, mut sock1_tx) = sock1.into_split(); - /// - /// // Since the sockets are paired, the paired send/recv - /// // functions can be used - /// let bytes = b"hello world"; - /// sock1_tx.send(bytes).await?; - /// - /// let mut buff = vec![0u8; 24]; - /// let size = sock2.recv(&mut buff).await?; - /// - /// let dgram = &buff[..size]; - /// assert_eq!(dgram, bytes); - /// - /// # Ok(()) - /// # } - /// ``` - /// - /// [`split`]: fn@crate::net::UnixDatagram::split - /// [`shutdown(Write)`]:fn@crate::net::UnixDatagram::shutdown - pub fn into_split(self) -> (OwnedRecvHalf, OwnedSendHalf) { - split_owned(self) - } } impl TryFrom<UnixDatagram> for mio_uds::UnixDatagram { type Error = io::Error; /// Consumes value, returning the mio I/O object. - /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. - /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner fn try_from(value: UnixDatagram) -> Result<Self, Self::Error> { value.io.into_inner() } |