diff options
author | Sean McArthur <sean@seanmonstar.com> | 2020-09-23 13:02:15 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-23 13:02:15 -0700 |
commit | a0557840eb424e174bf81a0175c40f9e176a2cc2 (patch) | |
tree | 676d33bf4144f0c0aac5af9f826ecc216a1d50e2 /tokio/src/net | |
parent | f25f12d57638a2928b3f738b3b1392d8773e276e (diff) |
io: use intrusive wait list for I/O driver (#2828)
This refactors I/O registration in a few ways:
- Cleans up the cached readiness in `PollEvented`. This cache used to
be helpful when readiness was a linked list of `*mut Node`s in
`Registration`. Previous refactors have turned `Registration` into just
an `AtomicUsize` holding the current readiness, so the cache is just
extra work and complexity. Gone.
- Polling the `Registration` for readiness now gives a `ReadyEvent`,
which includes the driver tick. This event must be passed back into
`clear_readiness`, so that the readiness is only cleared from `Registration`
if the tick hasn't changed. Previously, it was possible to clear the
readiness even though another thread had *just* polled the driver and
found the socket ready again.
- Registration now also contains an `async fn readiness`, which stores
wakers in an instrusive linked list. This allows an unbounded number
of tasks to register for readiness (previously, only 1 per direction (read
and write)). By using the intrusive linked list, there is no concern of
leaking the storage of the wakers, since they are stored inside the `async fn`
and released when the future is dropped.
- Registration retains a `poll_readiness(Direction)` method, to support
`AsyncRead` and `AsyncWrite`. They aren't able to use `async fn`s, and
so there are 2 reserved slots for those methods.
- IO types where it makes sense to have multiple tasks waiting on them
now take advantage of this new `async fn readiness`, such as `UdpSocket`
and `UnixDatagram`.
Additionally, this makes the `io-driver` "feature" internal-only (no longer
documented, not part of public API), and adds a second internal-only
feature, `io-readiness`, to group together linked list part of registration
that is only used by some of the IO types.
After a bit of discussion, changing stream-based transports (like
`TcpStream`) to have `async fn read(&self)` is punted, since that
is likely too easy of a footgun to activate.
Refs: #2779, #2728
Diffstat (limited to 'tokio/src/net')
-rw-r--r-- | tokio/src/net/tcp/listener.rs | 20 | ||||
-rw-r--r-- | tokio/src/net/tcp/stream.rs | 254 | ||||
-rw-r--r-- | tokio/src/net/udp/mod.rs | 4 | ||||
-rw-r--r-- | tokio/src/net/udp/socket.rs | 116 | ||||
-rw-r--r-- | tokio/src/net/udp/split.rs | 148 | ||||
-rw-r--r-- | tokio/src/net/unix/datagram/mod.rs | 5 | ||||
-rw-r--r-- | tokio/src/net/unix/datagram/socket.rs | 227 | ||||
-rw-r--r-- | tokio/src/net/unix/datagram/split.rs | 68 | ||||
-rw-r--r-- | tokio/src/net/unix/datagram/split_owned.rs | 148 | ||||
-rw-r--r-- | tokio/src/net/unix/listener.rs | 30 | ||||
-rw-r--r-- | tokio/src/net/unix/stream.rs | 54 |
11 files changed, 227 insertions, 847 deletions
diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs index 0d7bbdbb..323b8bca 100644 --- a/tokio/src/net/tcp/listener.rs +++ b/tokio/src/net/tcp/listener.rs @@ -205,15 +205,16 @@ impl TcpListener { &mut self, cx: &mut Context<'_>, ) -> Poll<io::Result<(net::TcpStream, SocketAddr)>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; + loop { + let ev = ready!(self.io.poll_read_ready(cx))?; - match self.io.get_ref().accept_std() { - Ok(pair) => Poll::Ready(Ok(pair)), - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending + match self.io.get_ref().accept_std() { + Ok(pair) => return Poll::Ready(Ok(pair)), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + Err(e) => return Poll::Ready(Err(e)), } - Err(e) => Poll::Ready(Err(e)), } } @@ -411,11 +412,6 @@ impl TryFrom<TcpListener> for mio::net::TcpListener { type Error = io::Error; /// Consumes value, returning the mio I/O object. - /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. - /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner fn try_from(value: TcpListener) -> Result<Self, Self::Error> { value.io.into_inner() } diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index e0348724..f4f705b4 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -299,15 +299,16 @@ impl TcpStream { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; + loop { + let ev = ready!(self.io.poll_read_ready(cx))?; - match self.io.get_ref().peek(buf) { - Ok(ret) => Poll::Ready(Ok(ret)), - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending + match self.io.get_ref().peek(buf) { + Ok(ret) => return Poll::Ready(Ok(ret)), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + Err(e) => return Poll::Ready(Err(e)), } - Err(e) => Poll::Ready(Err(e)), } } @@ -703,26 +704,28 @@ impl TcpStream { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - // Safety: `TcpStream::read` will not peak at the maybe uinitialized bytes. - let b = - unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; - match self.io.get_ref().read(b) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - Ok(n) => { - // Safety: We trust `TcpStream::read` to have filled up `n` bytes - // in the buffer. - unsafe { - buf.assume_init(n); + loop { + let ev = ready!(self.io.poll_read_ready(cx))?; + + // Safety: `TcpStream::read` will not peek at the maybe uinitialized bytes. + let b = unsafe { + &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) + }; + match self.io.get_ref().read(b) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + Ok(n) => { + // Safety: We trust `TcpStream::read` to have filled up `n` bytes + // in the buffer. + unsafe { + buf.assume_init(n); + } + buf.add_filled(n); + return Poll::Ready(Ok(())); } - buf.add_filled(n); - Poll::Ready(Ok(())) + Err(e) => return Poll::Ready(Err(e)), } - Err(e) => Poll::Ready(Err(e)), } } @@ -731,14 +734,15 @@ impl TcpStream { cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_write_ready(cx))?; + loop { + let ev = ready!(self.io.poll_write_ready(cx))?; - match self.io.get_ref().write(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending + match self.io.get_ref().write(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + x => return Poll::Ready(x), } - x => Poll::Ready(x), } } @@ -749,99 +753,100 @@ impl TcpStream { ) -> Poll<io::Result<usize>> { use std::io::IoSlice; - ready!(self.io.poll_write_ready(cx))?; - - // The `IoVec` (v0.1.x) type can't have a zero-length size, so create - // a dummy version from a 1-length slice which we'll overwrite with - // the `bytes_vectored` method. - static S: &[u8] = &[0]; - const MAX_BUFS: usize = 64; - - // IoSlice isn't Copy, so we must expand this manually ;_; - let mut slices: [IoSlice<'_>; MAX_BUFS] = [ - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - ]; - let cnt = buf.bytes_vectored(&mut slices); - - let iovec = <&IoVec>::from(S); - let mut vecs = [iovec; MAX_BUFS]; - for i in 0..cnt { - vecs[i] = (*slices[i]).into(); - } - - match self.io.get_ref().write_bufs(&vecs[..cnt]) { - Ok(n) => { - buf.advance(n); - Poll::Ready(Ok(n)) + loop { + let ev = ready!(self.io.poll_write_ready(cx))?; + + // The `IoVec` (v0.1.x) type can't have a zero-length size, so create + // a dummy version from a 1-length slice which we'll overwrite with + // the `bytes_vectored` method. + static S: &[u8] = &[0]; + const MAX_BUFS: usize = 64; + + // IoSlice isn't Copy, so we must expand this manually ;_; + let mut slices: [IoSlice<'_>; MAX_BUFS] = [ + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + ]; + let cnt = buf.bytes_vectored(&mut slices); + + let iovec = <&IoVec>::from(S); + let mut vecs = [iovec; MAX_BUFS]; + for i in 0..cnt { + vecs[i] = (*slices[i]).into(); } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending + + match self.io.get_ref().write_bufs(&vecs[..cnt]) { + Ok(n) => { + buf.advance(n); + return Poll::Ready(Ok(n)); + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + Err(e) => return Poll::Ready(Err(e)), } - Err(e) => Poll::Ready(Err(e)), } } } @@ -850,11 +855,6 @@ impl TryFrom<TcpStream> for mio::net::TcpStream { type Error = io::Error; /// Consumes value, returning the mio I/O object. - /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. - /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner fn try_from(value: TcpStream) -> Result<Self, Self::Error> { value.io.into_inner() } diff --git a/tokio/src/net/udp/mod.rs b/tokio/src/net/udp/mod.rs index d43121a1..c9bb0f83 100644 --- a/tokio/src/net/udp/mod.rs +++ b/tokio/src/net/udp/mod.rs @@ -1,7 +1,3 @@ //! UDP utility types. pub(crate) mod socket; -pub(crate) use socket::UdpSocket; - -mod split; -pub use split::{RecvHalf, ReuniteError, SendHalf}; diff --git a/tokio/src/net/udp/socket.rs b/tokio/src/net/udp/socket.rs index aeb25fb3..d0dece3e 100644 --- a/tokio/src/net/udp/socket.rs +++ b/tokio/src/net/udp/socket.rs @@ -1,13 +1,10 @@ -use crate::future::poll_fn; use crate::io::PollEvented; -use crate::net::udp::split::{split, RecvHalf, SendHalf}; use crate::net::ToSocketAddrs; use std::convert::TryFrom; use std::fmt; use std::io; use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr}; -use std::task::{Context, Poll}; cfg_udp! { /// A UDP socket @@ -67,15 +64,7 @@ impl UdpSocket { /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. pub fn from_std(socket: net::UdpSocket) -> io::Result<UdpSocket> { let io = mio::net::UdpSocket::from_socket(socket)?; - let io = PollEvented::new(io)?; - Ok(UdpSocket { io }) - } - - /// Splits the `UdpSocket` into a receive half and a send half. The two parts - /// can be used to receive and send datagrams concurrently, even from two - /// different tasks. - pub fn split(self) -> (RecvHalf, SendHalf) { - split(self) + UdpSocket::new(io) } /// Returns the local address that this socket is bound to. @@ -112,8 +101,10 @@ impl UdpSocket { /// will resolve to an error if the socket is not connected. /// /// [`connect`]: method@Self::connect - pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { - poll_fn(|cx| self.poll_send(cx, buf)).await + pub async fn send(&self, buf: &[u8]) -> io::Result<usize> { + self.io + .async_io(mio::Ready::writable(), |sock| sock.send(buf)) + .await } /// Try to send data on the socket to the remote address to which it is @@ -130,29 +121,6 @@ impl UdpSocket { self.io.get_ref().send(buf) } - // Poll IO functions that takes `&self` are provided for the split API. - // - // They are not public because (taken from the doc of `PollEvented`): - // - // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the - // caller must ensure that there are at most two tasks that use a - // `PollEvented` instance concurrently. One for reading and one for writing. - // While violating this requirement is "safe" from a Rust memory model point - // of view, it will result in unexpected behavior in the form of lost - // notifications and tasks hanging. - #[doc(hidden)] - pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> { - ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending - } - x => Poll::Ready(x), - } - } - /// Returns a future that receives a single datagram message on the socket from /// the remote address to which it is connected. On success, the future will resolve /// to the number of bytes read. @@ -165,21 +133,10 @@ impl UdpSocket { /// will fail if the socket is not connected. /// /// [`connect`]: method@Self::connect - pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { - poll_fn(|cx| self.poll_recv(cx, buf)).await - } - - #[doc(hidden)] - pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - match self.io.get_ref().recv(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - x => Poll::Ready(x), - } + pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> { + self.io + .async_io(mio::Ready::readable(), |sock| sock.recv(buf)) + .await } /// Returns a future that sends data on the socket to the given address. @@ -187,11 +144,11 @@ impl UdpSocket { /// /// The future will resolve to an error if the IP version of the socket does /// not match that of `target`. - pub async fn send_to<A: ToSocketAddrs>(&mut self, buf: &[u8], target: A) -> io::Result<usize> { + pub async fn send_to<A: ToSocketAddrs>(&self, buf: &[u8], target: A) -> io::Result<usize> { let mut addrs = target.to_socket_addrs().await?; match addrs.next() { - Some(target) => poll_fn(|cx| self.poll_send_to(cx, buf, &target)).await, + Some(target) => self.send_to_addr(buf, &target).await, None => Err(io::Error::new( io::ErrorKind::InvalidInput, "no addresses to send data to", @@ -214,23 +171,10 @@ impl UdpSocket { self.io.get_ref().send_to(buf, &target) } - // TODO: Public or not? - #[doc(hidden)] - pub fn poll_send_to( - &self, - cx: &mut Context<'_>, - buf: &[u8], - target: &SocketAddr, - ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send_to(buf, target) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending - } - x => Poll::Ready(x), - } + async fn send_to_addr(&self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> { + self.io + .async_io(mio::Ready::writable(), |sock| sock.send_to(buf, target)) + .await } /// Returns a future that receives a single datagram on the socket. On success, @@ -239,25 +183,10 @@ impl UdpSocket { /// The function must be called with valid byte array `buf` of sufficient size /// to hold the message bytes. If a message is too long to fit in the supplied /// buffer, excess bytes may be discarded. - pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - poll_fn(|cx| self.poll_recv_from(cx, buf)).await - } - - #[doc(hidden)] - pub fn poll_recv_from( - &self, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll<Result<(usize, SocketAddr), io::Error>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - match self.io.get_ref().recv_from(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - x => Poll::Ready(x), - } + pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.io + .async_io(mio::Ready::readable(), |sock| sock.recv_from(buf)) + .await } /// Gets the value of the `SO_BROADCAST` option for this socket. @@ -399,11 +328,6 @@ impl TryFrom<UdpSocket> for mio::net::UdpSocket { type Error = io::Error; /// Consumes value, returning the mio I/O object. - /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. - /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner fn try_from(value: UdpSocket) -> Result<Self, Self::Error> { value.io.into_inner() } @@ -423,7 +347,7 @@ impl TryFrom<net::UdpSocket> for UdpSocket { impl fmt::Debug for UdpSocket { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.io.get_ref().fmt(f) + self.io.fmt(f) } } diff --git a/tokio/src/net/udp/split.rs b/tokio/src/net/udp/split.rs deleted file mode 100644 index 8d87f1c7..00000000 --- a/tokio/src/net/udp/split.rs +++ /dev/null @@ -1,148 +0,0 @@ -//! [`UdpSocket`](crate::net::UdpSocket) split support. -//! -//! The [`split`](method@crate::net::UdpSocket::split) method splits a -//! `UdpSocket` into a receive half and a send half, which can be used to -//! receive and send datagrams concurrently, even from two different tasks. -//! -//! The halves provide access to the underlying socket, implementing -//! `AsRef<UdpSocket>`. This allows you to call `UdpSocket` methods that takes -//! `&self`, e.g., to get local address, to get and set socket options, to join -//! or leave multicast groups, etc. -//! -//! The halves can be reunited to the original socket with their `reunite` -//! methods. - -use crate::future::poll_fn; -use crate::net::udp::UdpSocket; - -use std::error::Error; -use std::fmt; -use std::io; -use std::net::SocketAddr; -use std::sync::Arc; - -/// The send half after [`split`](super::UdpSocket::split). -/// -/// Use [`send_to`](method@Self::send_to) or [`send`](method@Self::send) to send -/// datagrams. -#[derive(Debug)] -pub struct SendHalf(Arc<UdpSocket>); - -/// The recv half after [`split`](super::UdpSocket::split). -/// -/// Use [`recv_from`](method@Self::recv_from) or [`recv`](method@Self::recv) to receive -/// datagrams. -#[derive(Debug)] -pub struct RecvHalf(Arc<UdpSocket>); - -pub(crate) fn split(socket: UdpSocket) -> (RecvHalf, SendHalf) { - let shared = Arc::new(socket); - let send = shared.clone(); - let recv = shared; - (RecvHalf(recv), SendHalf(send)) -} - -/// Error indicating that two halves were not from the same socket, and thus could -/// not be `reunite`d. -#[derive(Debug)] -pub struct ReuniteError(pub SendHalf, pub RecvHalf); - -impl fmt::Display for ReuniteError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "tried to reunite halves that are not from the same socket" - ) - } -} - -impl Error for ReuniteError {} - -fn reunite(s: SendHalf, r: RecvHalf) -> Result<UdpSocket, ReuniteError> { - if Arc::ptr_eq(&s.0, &r.0) { - drop(r); - // Only two instances of the `Arc` are ever created, one for the - // receiver and one for the sender, and those `Arc`s are never exposed - // externally. And so when we drop one here, the other one must be the - // only remaining one. - Ok(Arc::try_unwrap(s.0).expect("udp: try_unwrap failed in reunite")) - } else { - Err(ReuniteError(s, r)) - } -} - -impl RecvHalf { - /// Attempts to put the two "halves" of a `UdpSocket` back together and - /// recover the original socket. Succeeds only if the two "halves" - /// originated from the same call to `UdpSocket::split`. - pub fn reunite(self, other: SendHalf) -> Result<UdpSocket, ReuniteError> { - reunite(other, self) - } - - /// Returns a future that receives a single datagram on the socket. On success, - /// the future resolves to the number of bytes read and the origin. - /// - /// The function must be called with valid byte array `buf` of sufficient size - /// to hold the message bytes. If a message is too long to fit in the supplied - /// buffer, excess bytes may be discarded. - pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - poll_fn(|cx| self.0.poll_recv_from(cx, buf)).await - } - - /// Returns a future that receives a single datagram message on the socket from - /// the remote address to which it is connected. On success, the future will resolve - /// to the number of bytes read. - /// - /// The function must be called with valid byte array `buf` of sufficient size to - /// hold the message bytes. If a message is too long to fit in the supplied buffer, - /// excess bytes may be discarded. - /// - /// The [`connect`] method will connect this socket to a remote address. The future - /// will fail if the socket is not connected. - /// - /// [`connect`]: super::UdpSocket::connect - pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { - poll_fn(|cx| self.0.poll_recv(cx, buf)).await - } -} - -impl SendHalf { - /// Attempts to put the two "halves" of a `UdpSocket` back together and - /// recover the original socket. Succeeds only if the two "halves" - /// originated from the same call to `UdpSocket::split`. - pub fn reunite(self, other: RecvHalf) -> Result<UdpSocket, ReuniteError> { - reunite(self, other) - } - - /// Returns a future that sends data on the socket to the given address. - /// On success, the future will resolve to the number of bytes written. - /// - /// The future will resolve to an error if the IP version of the socket does - /// not match that of `target`. - pub async fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> { - poll_fn(|cx| self.0.poll_send_to(cx, buf, target)).await - } - - /// Returns a future that sends data on the socket to the remote address to which it is connected. - /// On success, the future will resolve to the number of bytes written. - /// - /// The [`connect`] method will connect this socket to a remote address. The future - /// will resolve to an error if the socket is not connected. - /// - /// [`connect`]: super::UdpSocket::connect - pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { - poll_fn(|cx| self.0.poll_send(cx, buf)).await - } -} - -impl AsRef<UdpSocket> for SendHalf { - fn as_ref(&self) -> &UdpSocket { - &self.0 - } -} - -impl AsRef<UdpSocket> for RecvHalf { - fn as_ref(&self) -> &UdpSocket { - &self.0 - } -} diff --git a/tokio/src/net/unix/datagram/mod.rs b/tokio/src/net/unix/datagram/mod.rs index f484ae34..6268b4ac 100644 --- a/tokio/src/net/unix/datagram/mod.rs +++ b/tokio/src/net/unix/datagram/mod.rs @@ -1,8 +1,3 @@ //! Unix datagram types. pub(crate) mod socket; -pub(crate) mod split; -pub(crate) mod split_owned; - -pub use split::{RecvHalf, SendHalf}; -pub use split_owned::{OwnedRecvHalf, OwnedSendHalf, ReuniteError}; diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs index ba3a10c4..78baf279 100644 --- a/tokio/src/net/unix/datagram/socket.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -1,7 +1,4 @@ -use crate::future::poll_fn; use crate::io::PollEvented; -use crate::net::unix::datagram::split::{split, RecvHalf, SendHalf}; -use crate::net::unix::datagram::split_owned::{split_owned, OwnedRecvHalf, OwnedSendHalf}; use std::convert::TryFrom; use std::fmt; @@ -10,7 +7,6 @@ use std::net::Shutdown; use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::net::{self, SocketAddr}; use std::path::Path; -use std::task::{Context, Poll}; cfg_uds! { /// An I/O object representing a Unix datagram socket. @@ -38,9 +34,9 @@ cfg_uds! { /// /// // Bind each socket to a filesystem path /// let tx_path = tmp.path().join("tx"); - /// let mut tx = UnixDatagram::bind(&tx_path)?; + /// let tx = UnixDatagram::bind(&tx_path)?; /// let rx_path = tmp.path().join("rx"); - /// let mut rx = UnixDatagram::bind(&rx_path)?; + /// let rx = UnixDatagram::bind(&rx_path)?; /// /// let bytes = b"hello world"; /// tx.send_to(bytes, &rx_path).await?; @@ -64,7 +60,7 @@ cfg_uds! { /// use tokio::net::UnixDatagram; /// /// // Create the pair of sockets - /// let (mut sock1, mut sock2) = UnixDatagram::pair()?; + /// let (sock1, sock2) = UnixDatagram::pair()?; /// /// // Since the sockets are paired, the paired send/recv /// // functions can be used @@ -128,7 +124,7 @@ impl UnixDatagram { /// use tokio::net::UnixDatagram; /// /// // Create the pair of sockets - /// let (mut sock1, mut sock2) = UnixDatagram::pair()?; + /// let (sock1, sock2) = UnixDatagram::pair()?; /// /// // Since the sockets are paired, the paired send/recv /// // functions can be used @@ -208,12 +204,12 @@ impl UnixDatagram { /// use tempfile::tempdir; /// /// // Create an unbound socket - /// let mut tx = UnixDatagram::unbound()?; + /// let tx = UnixDatagram::unbound()?; /// /// // Create another, bound socket /// let tmp = tempdir()?; /// let rx_path = tmp.path().join("rx"); - /// let mut rx = UnixDatagram::bind(&rx_path)?; + /// let rx = UnixDatagram::bind(&rx_path)?; /// /// // Send to the bound socket /// let bytes = b"hello world"; @@ -247,12 +243,12 @@ impl UnixDatagram { /// use tempfile::tempdir; /// /// // Create an unbound socket - /// let mut tx = UnixDatagram::unbound()?; + /// let tx = UnixDatagram::unbound()?; /// /// // Create another, bound socket /// let tmp = tempdir()?; /// let rx_path = tmp.path().join("rx"); - /// let mut rx = UnixDatagram::bind(&rx_path)?; + /// let rx = UnixDatagram::bind(&rx_path)?; /// /// // Connect to the bound socket /// tx.connect(&rx_path)?; @@ -284,7 +280,7 @@ impl UnixDatagram { /// use tokio::net::UnixDatagram; /// /// // Create the pair of sockets - /// let (mut sock1, mut sock2) = UnixDatagram::pair()?; + /// let (sock1, sock2) = UnixDatagram::pair()?; /// /// // Since the sockets are paired, the paired send/recv /// // functions can be used @@ -300,8 +296,10 @@ impl UnixDatagram { /// # Ok(()) /// # } /// ``` - pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { - poll_fn(|cx| self.poll_send_priv(cx, buf)).await + pub async fn send(&self, buf: &[u8]) -> io::Result<usize> { + self.io + .async_io(mio::Ready::writable(), |sock| sock.send(buf)) + .await } /// Try to send a datagram to the peer without waiting. @@ -371,32 +369,6 @@ impl UnixDatagram { self.io.get_ref().send_to(buf, target) } - // Poll IO functions that takes `&self` are provided for the split API. - // - // They are not public because (taken from the doc of `PollEvented`): - // - // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the - // caller must ensure that there are at most two tasks that use a - // `PollEvented` instance concurrently. One for reading and one for writing. - // While violating this requirement is "safe" from a Rust memory model point - // of view, it will result in unexpected behavior in the form of lost - // notifications and tasks hanging. - pub(crate) fn poll_send_priv( - &self, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_write_ready(cx))?; - - match self.io.get_ref().send(buf) { - Err(ref e) if |