summaryrefslogtreecommitdiffstats
path: root/tokio/src/net/unix/datagram/socket.rs
diff options
context:
space:
mode:
authorSean McArthur <sean@seanmonstar.com>2020-09-23 13:02:15 -0700
committerGitHub <noreply@github.com>2020-09-23 13:02:15 -0700
commita0557840eb424e174bf81a0175c40f9e176a2cc2 (patch)
tree676d33bf4144f0c0aac5af9f826ecc216a1d50e2 /tokio/src/net/unix/datagram/socket.rs
parentf25f12d57638a2928b3f738b3b1392d8773e276e (diff)
io: use intrusive wait list for I/O driver (#2828)
This refactors I/O registration in a few ways: - Cleans up the cached readiness in `PollEvented`. This cache used to be helpful when readiness was a linked list of `*mut Node`s in `Registration`. Previous refactors have turned `Registration` into just an `AtomicUsize` holding the current readiness, so the cache is just extra work and complexity. Gone. - Polling the `Registration` for readiness now gives a `ReadyEvent`, which includes the driver tick. This event must be passed back into `clear_readiness`, so that the readiness is only cleared from `Registration` if the tick hasn't changed. Previously, it was possible to clear the readiness even though another thread had *just* polled the driver and found the socket ready again. - Registration now also contains an `async fn readiness`, which stores wakers in an instrusive linked list. This allows an unbounded number of tasks to register for readiness (previously, only 1 per direction (read and write)). By using the intrusive linked list, there is no concern of leaking the storage of the wakers, since they are stored inside the `async fn` and released when the future is dropped. - Registration retains a `poll_readiness(Direction)` method, to support `AsyncRead` and `AsyncWrite`. They aren't able to use `async fn`s, and so there are 2 reserved slots for those methods. - IO types where it makes sense to have multiple tasks waiting on them now take advantage of this new `async fn readiness`, such as `UdpSocket` and `UnixDatagram`. Additionally, this makes the `io-driver` "feature" internal-only (no longer documented, not part of public API), and adds a second internal-only feature, `io-readiness`, to group together linked list part of registration that is only used by some of the IO types. After a bit of discussion, changing stream-based transports (like `TcpStream`) to have `async fn read(&self)` is punted, since that is likely too easy of a footgun to activate. Refs: #2779, #2728
Diffstat (limited to 'tokio/src/net/unix/datagram/socket.rs')
-rw-r--r--tokio/src/net/unix/datagram/socket.rs227
1 files changed, 34 insertions, 193 deletions
diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs
index ba3a10c4..78baf279 100644
--- a/tokio/src/net/unix/datagram/socket.rs
+++ b/tokio/src/net/unix/datagram/socket.rs
@@ -1,7 +1,4 @@
-use crate::future::poll_fn;
use crate::io::PollEvented;
-use crate::net::unix::datagram::split::{split, RecvHalf, SendHalf};
-use crate::net::unix::datagram::split_owned::{split_owned, OwnedRecvHalf, OwnedSendHalf};
use std::convert::TryFrom;
use std::fmt;
@@ -10,7 +7,6 @@ use std::net::Shutdown;
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net::{self, SocketAddr};
use std::path::Path;
-use std::task::{Context, Poll};
cfg_uds! {
/// An I/O object representing a Unix datagram socket.
@@ -38,9 +34,9 @@ cfg_uds! {
///
/// // Bind each socket to a filesystem path
/// let tx_path = tmp.path().join("tx");
- /// let mut tx = UnixDatagram::bind(&tx_path)?;
+ /// let tx = UnixDatagram::bind(&tx_path)?;
/// let rx_path = tmp.path().join("rx");
- /// let mut rx = UnixDatagram::bind(&rx_path)?;
+ /// let rx = UnixDatagram::bind(&rx_path)?;
///
/// let bytes = b"hello world";
/// tx.send_to(bytes, &rx_path).await?;
@@ -64,7 +60,7 @@ cfg_uds! {
/// use tokio::net::UnixDatagram;
///
/// // Create the pair of sockets
- /// let (mut sock1, mut sock2) = UnixDatagram::pair()?;
+ /// let (sock1, sock2) = UnixDatagram::pair()?;
///
/// // Since the sockets are paired, the paired send/recv
/// // functions can be used
@@ -128,7 +124,7 @@ impl UnixDatagram {
/// use tokio::net::UnixDatagram;
///
/// // Create the pair of sockets
- /// let (mut sock1, mut sock2) = UnixDatagram::pair()?;
+ /// let (sock1, sock2) = UnixDatagram::pair()?;
///
/// // Since the sockets are paired, the paired send/recv
/// // functions can be used
@@ -208,12 +204,12 @@ impl UnixDatagram {
/// use tempfile::tempdir;
///
/// // Create an unbound socket
- /// let mut tx = UnixDatagram::unbound()?;
+ /// let tx = UnixDatagram::unbound()?;
///
/// // Create another, bound socket
/// let tmp = tempdir()?;
/// let rx_path = tmp.path().join("rx");
- /// let mut rx = UnixDatagram::bind(&rx_path)?;
+ /// let rx = UnixDatagram::bind(&rx_path)?;
///
/// // Send to the bound socket
/// let bytes = b"hello world";
@@ -247,12 +243,12 @@ impl UnixDatagram {
/// use tempfile::tempdir;
///
/// // Create an unbound socket
- /// let mut tx = UnixDatagram::unbound()?;
+ /// let tx = UnixDatagram::unbound()?;
///
/// // Create another, bound socket
/// let tmp = tempdir()?;
/// let rx_path = tmp.path().join("rx");
- /// let mut rx = UnixDatagram::bind(&rx_path)?;
+ /// let rx = UnixDatagram::bind(&rx_path)?;
///
/// // Connect to the bound socket
/// tx.connect(&rx_path)?;
@@ -284,7 +280,7 @@ impl UnixDatagram {
/// use tokio::net::UnixDatagram;
///
/// // Create the pair of sockets
- /// let (mut sock1, mut sock2) = UnixDatagram::pair()?;
+ /// let (sock1, sock2) = UnixDatagram::pair()?;
///
/// // Since the sockets are paired, the paired send/recv
/// // functions can be used
@@ -300,8 +296,10 @@ impl UnixDatagram {
/// # Ok(())
/// # }
/// ```
- pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
- poll_fn(|cx| self.poll_send_priv(cx, buf)).await
+ pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
+ self.io
+ .async_io(mio::Ready::writable(), |sock| sock.send(buf))
+ .await
}
/// Try to send a datagram to the peer without waiting.
@@ -371,32 +369,6 @@ impl UnixDatagram {
self.io.get_ref().send_to(buf, target)
}
- // Poll IO functions that takes `&self` are provided for the split API.
- //
- // They are not public because (taken from the doc of `PollEvented`):
- //
- // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the
- // caller must ensure that there are at most two tasks that use a
- // `PollEvented` instance concurrently. One for reading and one for writing.
- // While violating this requirement is "safe" from a Rust memory model point
- // of view, it will result in unexpected behavior in the form of lost
- // notifications and tasks hanging.
- pub(crate) fn poll_send_priv(
- &self,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_write_ready(cx))?;
-
- match self.io.get_ref().send(buf) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_write_ready(cx)?;
- Poll::Pending
- }
- x => Poll::Ready(x),
- }
- }
-
/// Receives data from the socket.
///
/// # Examples
@@ -407,7 +379,7 @@ impl UnixDatagram {
/// use tokio::net::UnixDatagram;
///
/// // Create the pair of sockets
- /// let (mut sock1, mut sock2) = UnixDatagram::pair()?;
+ /// let (sock1, sock2) = UnixDatagram::pair()?;
///
/// // Since the sockets are paired, the paired send/recv
/// // functions can be used
@@ -423,8 +395,10 @@ impl UnixDatagram {
/// # Ok(())
/// # }
/// ```
- pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- poll_fn(|cx| self.poll_recv_priv(cx, buf)).await
+ pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.io
+ .async_io(mio::Ready::readable(), |sock| sock.recv(buf))
+ .await
}
/// Try to receive a datagram from the peer without waiting.
@@ -455,22 +429,6 @@ impl UnixDatagram {
self.io.get_ref().recv(buf)
}
- pub(crate) fn poll_recv_priv(
- &self,
- cx: &mut Context<'_>,
- buf: &mut [u8],
- ) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
-
- match self.io.get_ref().recv(buf) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(cx, mio::Ready::readable())?;
- Poll::Pending
- }
- x => Poll::Ready(x),
- }
- }
-
/// Sends data on the socket to the specified address.
///
/// # Examples
@@ -487,9 +445,9 @@ impl UnixDatagram {
///
/// // Bind each socket to a filesystem path
/// let tx_path = tmp.path().join("tx");
- /// let mut tx = UnixDatagram::bind(&tx_path)?;
+ /// let tx = UnixDatagram::bind(&tx_path)?;
/// let rx_path = tmp.path().join("rx");
- /// let mut rx = UnixDatagram::bind(&rx_path)?;
+ /// let rx = UnixDatagram::bind(&rx_path)?;
///
/// let bytes = b"hello world";
/// tx.send_to(bytes, &rx_path).await?;
@@ -504,28 +462,15 @@ impl UnixDatagram {
/// # Ok(())
/// # }
/// ```
- pub async fn send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize>
+ pub async fn send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
where
- P: AsRef<Path> + Unpin,
+ P: AsRef<Path>,
{
- poll_fn(|cx| self.poll_send_to_priv(cx, buf, target.as_ref())).await
- }
-
- pub(crate) fn poll_send_to_priv(
- &self,
- cx: &mut Context<'_>,
- buf: &[u8],
- target: &Path,
- ) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_write_ready(cx))?;
-
- match self.io.get_ref().send_to(buf, target) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_write_ready(cx)?;
- Poll::Pending
- }
- x => Poll::Ready(x),
- }
+ self.io
+ .async_io(mio::Ready::writable(), |sock| {
+ sock.send_to(buf, target.as_ref())
+ })
+ .await
}
/// Receives data from the socket.
@@ -544,9 +489,9 @@ impl UnixDatagram {
///
/// // Bind each socket to a filesystem path
/// let tx_path = tmp.path().join("tx");
- /// let mut tx = UnixDatagram::bind(&tx_path)?;
+ /// let tx = UnixDatagram::bind(&tx_path)?;
/// let rx_path = tmp.path().join("rx");
- /// let mut rx = UnixDatagram::bind(&rx_path)?;
+ /// let rx = UnixDatagram::bind(&rx_path)?;
///
/// let bytes = b"hello world";
/// tx.send_to(bytes, &rx_path).await?;
@@ -561,8 +506,10 @@ impl UnixDatagram {
/// # Ok(())
/// # }
/// ```
- pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
- poll_fn(|cx| self.poll_recv_from_priv(cx, buf)).await
+ pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+ self.io
+ .async_io(mio::Ready::readable(), |sock| sock.recv_from(buf))
+ .await
}
/// Try to receive data from the socket without waiting.
@@ -601,22 +548,6 @@ impl UnixDatagram {
self.io.get_ref().recv_from(buf)
}
- pub(crate) fn poll_recv_from_priv(
- &self,
- cx: &mut Context<'_>,
- buf: &mut [u8],
- ) -> Poll<Result<(usize, SocketAddr), io::Error>> {
- ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
-
- match self.io.get_ref().recv_from(buf) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(cx, mio::Ready::readable())?;
- Poll::Pending
- }
- x => Poll::Ready(x),
- }
- }
-
/// Returns the local address that this socket is bound to.
///
/// # Examples
@@ -748,7 +679,7 @@ impl UnixDatagram {
/// use std::net::Shutdown;
///
/// // Create an unbound socket
- /// let (mut socket, other) = UnixDatagram::pair()?;
+ /// let (socket, other) = UnixDatagram::pair()?;
///
/// socket.shutdown(Shutdown::Both)?;
///
@@ -768,102 +699,12 @@ impl UnixDatagram {
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
self.io.get_ref().shutdown(how)
}
-
- // These lifetime markers also appear in the generated documentation, and make
- // it more clear that this is a *borrowed* split.
- #[allow(clippy::needless_lifetimes)]
- /// Split a `UnixDatagram` into a receive half and a send half, which can be used
- /// to receive and send the datagram concurrently.
- ///
- /// This method is more efficient than [`into_split`], but the halves cannot
- /// be moved into independently spawned tasks.
- ///
- /// [`into_split`]: fn@crate::net::UnixDatagram::into_split
- ///
- /// # Examples
- /// ```
- /// # use std::error::Error;
- /// # #[tokio::main]
- /// # async fn main() -> Result<(), Box<dyn Error>> {
- /// use tokio::net::UnixDatagram;
- ///
- /// // Create the pair of sockets
- /// let (mut sock1, mut sock2) = UnixDatagram::pair()?;
- ///
- /// // Split sock1
- /// let (sock1_rx, mut sock1_tx) = sock1.split();
- ///
- /// // Since the sockets are paired, the paired send/recv
- /// // functions can be used
- /// let bytes = b"hello world";
- /// sock1_tx.send(bytes).await?;
- ///
- /// let mut buff = vec![0u8; 24];
- /// let size = sock2.recv(&mut buff).await?;
- ///
- /// let dgram = &buff[..size];
- /// assert_eq!(dgram, bytes);
- ///
- /// # Ok(())
- /// # }
- /// ```
- pub fn split<'a>(&'a mut self) -> (RecvHalf<'a>, SendHalf<'a>) {
- split(self)
- }
-
- /// Split a `UnixDatagram` into a receive half and a send half, which can be used
- /// to receive and send the datagram concurrently.
- ///
- /// Unlike [`split`], the owned halves can be moved to separate tasks,
- /// however this comes at the cost of a heap allocation.
- ///
- /// **Note:** Dropping the write half will shut down the write half of the
- /// datagram. This is equivalent to calling [`shutdown(Write)`].
- ///
- /// # Examples
- /// ```
- /// # use std::error::Error;
- /// # #[tokio::main]
- /// # async fn main() -> Result<(), Box<dyn Error>> {
- /// use tokio::net::UnixDatagram;
- ///
- /// // Create the pair of sockets
- /// let (sock1, mut sock2) = UnixDatagram::pair()?;
- ///
- /// // Split sock1
- /// let (sock1_rx, mut sock1_tx) = sock1.into_split();
- ///
- /// // Since the sockets are paired, the paired send/recv
- /// // functions can be used
- /// let bytes = b"hello world";
- /// sock1_tx.send(bytes).await?;
- ///
- /// let mut buff = vec![0u8; 24];
- /// let size = sock2.recv(&mut buff).await?;
- ///
- /// let dgram = &buff[..size];
- /// assert_eq!(dgram, bytes);
- ///
- /// # Ok(())
- /// # }
- /// ```
- ///
- /// [`split`]: fn@crate::net::UnixDatagram::split
- /// [`shutdown(Write)`]:fn@crate::net::UnixDatagram::shutdown
- pub fn into_split(self) -> (OwnedRecvHalf, OwnedSendHalf) {
- split_owned(self)
- }
}
impl TryFrom<UnixDatagram> for mio_uds::UnixDatagram {
type Error = io::Error;
/// Consumes value, returning the mio I/O object.
- ///
- /// See [`PollEvented::into_inner`] for more details about
- /// resource deregistration that happens during the call.
- ///
- /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
fn try_from(value: UnixDatagram) -> Result<Self, Self::Error> {
value.io.into_inner()
}