summaryrefslogtreecommitdiffstats
path: root/tokio/src/net
diff options
context:
space:
mode:
authorSean McArthur <sean@seanmonstar.com>2020-09-23 13:02:15 -0700
committerGitHub <noreply@github.com>2020-09-23 13:02:15 -0700
commita0557840eb424e174bf81a0175c40f9e176a2cc2 (patch)
tree676d33bf4144f0c0aac5af9f826ecc216a1d50e2 /tokio/src/net
parentf25f12d57638a2928b3f738b3b1392d8773e276e (diff)
io: use intrusive wait list for I/O driver (#2828)
This refactors I/O registration in a few ways: - Cleans up the cached readiness in `PollEvented`. This cache used to be helpful when readiness was a linked list of `*mut Node`s in `Registration`. Previous refactors have turned `Registration` into just an `AtomicUsize` holding the current readiness, so the cache is just extra work and complexity. Gone. - Polling the `Registration` for readiness now gives a `ReadyEvent`, which includes the driver tick. This event must be passed back into `clear_readiness`, so that the readiness is only cleared from `Registration` if the tick hasn't changed. Previously, it was possible to clear the readiness even though another thread had *just* polled the driver and found the socket ready again. - Registration now also contains an `async fn readiness`, which stores wakers in an instrusive linked list. This allows an unbounded number of tasks to register for readiness (previously, only 1 per direction (read and write)). By using the intrusive linked list, there is no concern of leaking the storage of the wakers, since they are stored inside the `async fn` and released when the future is dropped. - Registration retains a `poll_readiness(Direction)` method, to support `AsyncRead` and `AsyncWrite`. They aren't able to use `async fn`s, and so there are 2 reserved slots for those methods. - IO types where it makes sense to have multiple tasks waiting on them now take advantage of this new `async fn readiness`, such as `UdpSocket` and `UnixDatagram`. Additionally, this makes the `io-driver` "feature" internal-only (no longer documented, not part of public API), and adds a second internal-only feature, `io-readiness`, to group together linked list part of registration that is only used by some of the IO types. After a bit of discussion, changing stream-based transports (like `TcpStream`) to have `async fn read(&self)` is punted, since that is likely too easy of a footgun to activate. Refs: #2779, #2728
Diffstat (limited to 'tokio/src/net')
-rw-r--r--tokio/src/net/tcp/listener.rs20
-rw-r--r--tokio/src/net/tcp/stream.rs254
-rw-r--r--tokio/src/net/udp/mod.rs4
-rw-r--r--tokio/src/net/udp/socket.rs116
-rw-r--r--tokio/src/net/udp/split.rs148
-rw-r--r--tokio/src/net/unix/datagram/mod.rs5
-rw-r--r--tokio/src/net/unix/datagram/socket.rs227
-rw-r--r--tokio/src/net/unix/datagram/split.rs68
-rw-r--r--tokio/src/net/unix/datagram/split_owned.rs148
-rw-r--r--tokio/src/net/unix/listener.rs30
-rw-r--r--tokio/src/net/unix/stream.rs54
11 files changed, 227 insertions, 847 deletions
diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs
index 0d7bbdbb..323b8bca 100644
--- a/tokio/src/net/tcp/listener.rs
+++ b/tokio/src/net/tcp/listener.rs
@@ -205,15 +205,16 @@ impl TcpListener {
&mut self,
cx: &mut Context<'_>,
) -> Poll<io::Result<(net::TcpStream, SocketAddr)>> {
- ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
+ loop {
+ let ev = ready!(self.io.poll_read_ready(cx))?;
- match self.io.get_ref().accept_std() {
- Ok(pair) => Poll::Ready(Ok(pair)),
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(cx, mio::Ready::readable())?;
- Poll::Pending
+ match self.io.get_ref().accept_std() {
+ Ok(pair) => return Poll::Ready(Ok(pair)),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ Err(e) => return Poll::Ready(Err(e)),
}
- Err(e) => Poll::Ready(Err(e)),
}
}
@@ -411,11 +412,6 @@ impl TryFrom<TcpListener> for mio::net::TcpListener {
type Error = io::Error;
/// Consumes value, returning the mio I/O object.
- ///
- /// See [`PollEvented::into_inner`] for more details about
- /// resource deregistration that happens during the call.
- ///
- /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
fn try_from(value: TcpListener) -> Result<Self, Self::Error> {
value.io.into_inner()
}
diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs
index e0348724..f4f705b4 100644
--- a/tokio/src/net/tcp/stream.rs
+++ b/tokio/src/net/tcp/stream.rs
@@ -299,15 +299,16 @@ impl TcpStream {
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
+ loop {
+ let ev = ready!(self.io.poll_read_ready(cx))?;
- match self.io.get_ref().peek(buf) {
- Ok(ret) => Poll::Ready(Ok(ret)),
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(cx, mio::Ready::readable())?;
- Poll::Pending
+ match self.io.get_ref().peek(buf) {
+ Ok(ret) => return Poll::Ready(Ok(ret)),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ Err(e) => return Poll::Ready(Err(e)),
}
- Err(e) => Poll::Ready(Err(e)),
}
}
@@ -703,26 +704,28 @@ impl TcpStream {
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
- ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
-
- // Safety: `TcpStream::read` will not peak at the maybe uinitialized bytes.
- let b =
- unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
- match self.io.get_ref().read(b) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(cx, mio::Ready::readable())?;
- Poll::Pending
- }
- Ok(n) => {
- // Safety: We trust `TcpStream::read` to have filled up `n` bytes
- // in the buffer.
- unsafe {
- buf.assume_init(n);
+ loop {
+ let ev = ready!(self.io.poll_read_ready(cx))?;
+
+ // Safety: `TcpStream::read` will not peek at the maybe uinitialized bytes.
+ let b = unsafe {
+ &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
+ };
+ match self.io.get_ref().read(b) {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ Ok(n) => {
+ // Safety: We trust `TcpStream::read` to have filled up `n` bytes
+ // in the buffer.
+ unsafe {
+ buf.assume_init(n);
+ }
+ buf.add_filled(n);
+ return Poll::Ready(Ok(()));
}
- buf.add_filled(n);
- Poll::Ready(Ok(()))
+ Err(e) => return Poll::Ready(Err(e)),
}
- Err(e) => Poll::Ready(Err(e)),
}
}
@@ -731,14 +734,15 @@ impl TcpStream {
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_write_ready(cx))?;
+ loop {
+ let ev = ready!(self.io.poll_write_ready(cx))?;
- match self.io.get_ref().write(buf) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_write_ready(cx)?;
- Poll::Pending
+ match self.io.get_ref().write(buf) {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ x => return Poll::Ready(x),
}
- x => Poll::Ready(x),
}
}
@@ -749,99 +753,100 @@ impl TcpStream {
) -> Poll<io::Result<usize>> {
use std::io::IoSlice;
- ready!(self.io.poll_write_ready(cx))?;
-
- // The `IoVec` (v0.1.x) type can't have a zero-length size, so create
- // a dummy version from a 1-length slice which we'll overwrite with
- // the `bytes_vectored` method.
- static S: &[u8] = &[0];
- const MAX_BUFS: usize = 64;
-
- // IoSlice isn't Copy, so we must expand this manually ;_;
- let mut slices: [IoSlice<'_>; MAX_BUFS] = [
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- ];
- let cnt = buf.bytes_vectored(&mut slices);
-
- let iovec = <&IoVec>::from(S);
- let mut vecs = [iovec; MAX_BUFS];
- for i in 0..cnt {
- vecs[i] = (*slices[i]).into();
- }
-
- match self.io.get_ref().write_bufs(&vecs[..cnt]) {
- Ok(n) => {
- buf.advance(n);
- Poll::Ready(Ok(n))
+ loop {
+ let ev = ready!(self.io.poll_write_ready(cx))?;
+
+ // The `IoVec` (v0.1.x) type can't have a zero-length size, so create
+ // a dummy version from a 1-length slice which we'll overwrite with
+ // the `bytes_vectored` method.
+ static S: &[u8] = &[0];
+ const MAX_BUFS: usize = 64;
+
+ // IoSlice isn't Copy, so we must expand this manually ;_;
+ let mut slices: [IoSlice<'_>; MAX_BUFS] = [
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ ];
+ let cnt = buf.bytes_vectored(&mut slices);
+
+ let iovec = <&IoVec>::from(S);
+ let mut vecs = [iovec; MAX_BUFS];
+ for i in 0..cnt {
+ vecs[i] = (*slices[i]).into();
}
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_write_ready(cx)?;
- Poll::Pending
+
+ match self.io.get_ref().write_bufs(&vecs[..cnt]) {
+ Ok(n) => {
+ buf.advance(n);
+ return Poll::Ready(Ok(n));
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ Err(e) => return Poll::Ready(Err(e)),
}
- Err(e) => Poll::Ready(Err(e)),
}
}
}
@@ -850,11 +855,6 @@ impl TryFrom<TcpStream> for mio::net::TcpStream {
type Error = io::Error;
/// Consumes value, returning the mio I/O object.
- ///
- /// See [`PollEvented::into_inner`] for more details about
- /// resource deregistration that happens during the call.
- ///
- /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
fn try_from(value: TcpStream) -> Result<Self, Self::Error> {
value.io.into_inner()
}
diff --git a/tokio/src/net/udp/mod.rs b/tokio/src/net/udp/mod.rs
index d43121a1..c9bb0f83 100644
--- a/tokio/src/net/udp/mod.rs
+++ b/tokio/src/net/udp/mod.rs
@@ -1,7 +1,3 @@
//! UDP utility types.
pub(crate) mod socket;
-pub(crate) use socket::UdpSocket;
-
-mod split;
-pub use split::{RecvHalf, ReuniteError, SendHalf};
diff --git a/tokio/src/net/udp/socket.rs b/tokio/src/net/udp/socket.rs
index aeb25fb3..d0dece3e 100644
--- a/tokio/src/net/udp/socket.rs
+++ b/tokio/src/net/udp/socket.rs
@@ -1,13 +1,10 @@
-use crate::future::poll_fn;
use crate::io::PollEvented;
-use crate::net::udp::split::{split, RecvHalf, SendHalf};
use crate::net::ToSocketAddrs;
use std::convert::TryFrom;
use std::fmt;
use std::io;
use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr};
-use std::task::{Context, Poll};
cfg_udp! {
/// A UDP socket
@@ -67,15 +64,7 @@ impl UdpSocket {
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(socket: net::UdpSocket) -> io::Result<UdpSocket> {
let io = mio::net::UdpSocket::from_socket(socket)?;
- let io = PollEvented::new(io)?;
- Ok(UdpSocket { io })
- }
-
- /// Splits the `UdpSocket` into a receive half and a send half. The two parts
- /// can be used to receive and send datagrams concurrently, even from two
- /// different tasks.
- pub fn split(self) -> (RecvHalf, SendHalf) {
- split(self)
+ UdpSocket::new(io)
}
/// Returns the local address that this socket is bound to.
@@ -112,8 +101,10 @@ impl UdpSocket {
/// will resolve to an error if the socket is not connected.
///
/// [`connect`]: method@Self::connect
- pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
- poll_fn(|cx| self.poll_send(cx, buf)).await
+ pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
+ self.io
+ .async_io(mio::Ready::writable(), |sock| sock.send(buf))
+ .await
}
/// Try to send data on the socket to the remote address to which it is
@@ -130,29 +121,6 @@ impl UdpSocket {
self.io.get_ref().send(buf)
}
- // Poll IO functions that takes `&self` are provided for the split API.
- //
- // They are not public because (taken from the doc of `PollEvented`):
- //
- // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the
- // caller must ensure that there are at most two tasks that use a
- // `PollEvented` instance concurrently. One for reading and one for writing.
- // While violating this requirement is "safe" from a Rust memory model point
- // of view, it will result in unexpected behavior in the form of lost
- // notifications and tasks hanging.
- #[doc(hidden)]
- pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_write_ready(cx))?;
-
- match self.io.get_ref().send(buf) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_write_ready(cx)?;
- Poll::Pending
- }
- x => Poll::Ready(x),
- }
- }
-
/// Returns a future that receives a single datagram message on the socket from
/// the remote address to which it is connected. On success, the future will resolve
/// to the number of bytes read.
@@ -165,21 +133,10 @@ impl UdpSocket {
/// will fail if the socket is not connected.
///
/// [`connect`]: method@Self::connect
- pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- poll_fn(|cx| self.poll_recv(cx, buf)).await
- }
-
- #[doc(hidden)]
- pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
-
- match self.io.get_ref().recv(buf) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(cx, mio::Ready::readable())?;
- Poll::Pending
- }
- x => Poll::Ready(x),
- }
+ pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.io
+ .async_io(mio::Ready::readable(), |sock| sock.recv(buf))
+ .await
}
/// Returns a future that sends data on the socket to the given address.
@@ -187,11 +144,11 @@ impl UdpSocket {
///
/// The future will resolve to an error if the IP version of the socket does
/// not match that of `target`.
- pub async fn send_to<A: ToSocketAddrs>(&mut self, buf: &[u8], target: A) -> io::Result<usize> {
+ pub async fn send_to<A: ToSocketAddrs>(&self, buf: &[u8], target: A) -> io::Result<usize> {
let mut addrs = target.to_socket_addrs().await?;
match addrs.next() {
- Some(target) => poll_fn(|cx| self.poll_send_to(cx, buf, &target)).await,
+ Some(target) => self.send_to_addr(buf, &target).await,
None => Err(io::Error::new(
io::ErrorKind::InvalidInput,
"no addresses to send data to",
@@ -214,23 +171,10 @@ impl UdpSocket {
self.io.get_ref().send_to(buf, &target)
}
- // TODO: Public or not?
- #[doc(hidden)]
- pub fn poll_send_to(
- &self,
- cx: &mut Context<'_>,
- buf: &[u8],
- target: &SocketAddr,
- ) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_write_ready(cx))?;
-
- match self.io.get_ref().send_to(buf, target) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_write_ready(cx)?;
- Poll::Pending
- }
- x => Poll::Ready(x),
- }
+ async fn send_to_addr(&self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> {
+ self.io
+ .async_io(mio::Ready::writable(), |sock| sock.send_to(buf, target))
+ .await
}
/// Returns a future that receives a single datagram on the socket. On success,
@@ -239,25 +183,10 @@ impl UdpSocket {
/// The function must be called with valid byte array `buf` of sufficient size
/// to hold the message bytes. If a message is too long to fit in the supplied
/// buffer, excess bytes may be discarded.
- pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
- poll_fn(|cx| self.poll_recv_from(cx, buf)).await
- }
-
- #[doc(hidden)]
- pub fn poll_recv_from(
- &self,
- cx: &mut Context<'_>,
- buf: &mut [u8],
- ) -> Poll<Result<(usize, SocketAddr), io::Error>> {
- ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
-
- match self.io.get_ref().recv_from(buf) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(cx, mio::Ready::readable())?;
- Poll::Pending
- }
- x => Poll::Ready(x),
- }
+ pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+ self.io
+ .async_io(mio::Ready::readable(), |sock| sock.recv_from(buf))
+ .await
}
/// Gets the value of the `SO_BROADCAST` option for this socket.
@@ -399,11 +328,6 @@ impl TryFrom<UdpSocket> for mio::net::UdpSocket {
type Error = io::Error;
/// Consumes value, returning the mio I/O object.
- ///
- /// See [`PollEvented::into_inner`] for more details about
- /// resource deregistration that happens during the call.
- ///
- /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
fn try_from(value: UdpSocket) -> Result<Self, Self::Error> {
value.io.into_inner()
}
@@ -423,7 +347,7 @@ impl TryFrom<net::UdpSocket> for UdpSocket {
impl fmt::Debug for UdpSocket {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- self.io.get_ref().fmt(f)
+ self.io.fmt(f)
}
}
diff --git a/tokio/src/net/udp/split.rs b/tokio/src/net/udp/split.rs
deleted file mode 100644
index 8d87f1c7..00000000
--- a/tokio/src/net/udp/split.rs
+++ /dev/null
@@ -1,148 +0,0 @@
-//! [`UdpSocket`](crate::net::UdpSocket) split support.
-//!
-//! The [`split`](method@crate::net::UdpSocket::split) method splits a
-//! `UdpSocket` into a receive half and a send half, which can be used to
-//! receive and send datagrams concurrently, even from two different tasks.
-//!
-//! The halves provide access to the underlying socket, implementing
-//! `AsRef<UdpSocket>`. This allows you to call `UdpSocket` methods that takes
-//! `&self`, e.g., to get local address, to get and set socket options, to join
-//! or leave multicast groups, etc.
-//!
-//! The halves can be reunited to the original socket with their `reunite`
-//! methods.
-
-use crate::future::poll_fn;
-use crate::net::udp::UdpSocket;
-
-use std::error::Error;
-use std::fmt;
-use std::io;
-use std::net::SocketAddr;
-use std::sync::Arc;
-
-/// The send half after [`split`](super::UdpSocket::split).
-///
-/// Use [`send_to`](method@Self::send_to) or [`send`](method@Self::send) to send
-/// datagrams.
-#[derive(Debug)]
-pub struct SendHalf(Arc<UdpSocket>);
-
-/// The recv half after [`split`](super::UdpSocket::split).
-///
-/// Use [`recv_from`](method@Self::recv_from) or [`recv`](method@Self::recv) to receive
-/// datagrams.
-#[derive(Debug)]
-pub struct RecvHalf(Arc<UdpSocket>);
-
-pub(crate) fn split(socket: UdpSocket) -> (RecvHalf, SendHalf) {
- let shared = Arc::new(socket);
- let send = shared.clone();
- let recv = shared;
- (RecvHalf(recv), SendHalf(send))
-}
-
-/// Error indicating that two halves were not from the same socket, and thus could
-/// not be `reunite`d.
-#[derive(Debug)]
-pub struct ReuniteError(pub SendHalf, pub RecvHalf);
-
-impl fmt::Display for ReuniteError {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(
- f,
- "tried to reunite halves that are not from the same socket"
- )
- }
-}
-
-impl Error for ReuniteError {}
-
-fn reunite(s: SendHalf, r: RecvHalf) -> Result<UdpSocket, ReuniteError> {
- if Arc::ptr_eq(&s.0, &r.0) {
- drop(r);
- // Only two instances of the `Arc` are ever created, one for the
- // receiver and one for the sender, and those `Arc`s are never exposed
- // externally. And so when we drop one here, the other one must be the
- // only remaining one.
- Ok(Arc::try_unwrap(s.0).expect("udp: try_unwrap failed in reunite"))
- } else {
- Err(ReuniteError(s, r))
- }
-}
-
-impl RecvHalf {
- /// Attempts to put the two "halves" of a `UdpSocket` back together and
- /// recover the original socket. Succeeds only if the two "halves"
- /// originated from the same call to `UdpSocket::split`.
- pub fn reunite(self, other: SendHalf) -> Result<UdpSocket, ReuniteError> {
- reunite(other, self)
- }
-
- /// Returns a future that receives a single datagram on the socket. On success,
- /// the future resolves to the number of bytes read and the origin.
- ///
- /// The function must be called with valid byte array `buf` of sufficient size
- /// to hold the message bytes. If a message is too long to fit in the supplied
- /// buffer, excess bytes may be discarded.
- pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
- poll_fn(|cx| self.0.poll_recv_from(cx, buf)).await
- }
-
- /// Returns a future that receives a single datagram message on the socket from
- /// the remote address to which it is connected. On success, the future will resolve
- /// to the number of bytes read.
- ///
- /// The function must be called with valid byte array `buf` of sufficient size to
- /// hold the message bytes. If a message is too long to fit in the supplied buffer,
- /// excess bytes may be discarded.
- ///
- /// The [`connect`] method will connect this socket to a remote address. The future
- /// will fail if the socket is not connected.
- ///
- /// [`connect`]: super::UdpSocket::connect
- pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- poll_fn(|cx| self.0.poll_recv(cx, buf)).await
- }
-}
-
-impl SendHalf {
- /// Attempts to put the two "halves" of a `UdpSocket` back together and
- /// recover the original socket. Succeeds only if the two "halves"
- /// originated from the same call to `UdpSocket::split`.
- pub fn reunite(self, other: RecvHalf) -> Result<UdpSocket, ReuniteError> {
- reunite(self, other)
- }
-
- /// Returns a future that sends data on the socket to the given address.
- /// On success, the future will resolve to the number of bytes written.
- ///
- /// The future will resolve to an error if the IP version of the socket does
- /// not match that of `target`.
- pub async fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> {
- poll_fn(|cx| self.0.poll_send_to(cx, buf, target)).await
- }
-
- /// Returns a future that sends data on the socket to the remote address to which it is connected.
- /// On success, the future will resolve to the number of bytes written.
- ///
- /// The [`connect`] method will connect this socket to a remote address. The future
- /// will resolve to an error if the socket is not connected.
- ///
- /// [`connect`]: super::UdpSocket::connect
- pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
- poll_fn(|cx| self.0.poll_send(cx, buf)).await
- }
-}
-
-impl AsRef<UdpSocket> for SendHalf {
- fn as_ref(&self) -> &UdpSocket {
- &self.0
- }
-}
-
-impl AsRef<UdpSocket> for RecvHalf {
- fn as_ref(&self) -> &UdpSocket {
- &self.0
- }
-}
diff --git a/tokio/src/net/unix/datagram/mod.rs b/tokio/src/net/unix/datagram/mod.rs
index f484ae34..6268b4ac 100644
--- a/tokio/src/net/unix/datagram/mod.rs
+++ b/tokio/src/net/unix/datagram/mod.rs
@@ -1,8 +1,3 @@
//! Unix datagram types.
pub(crate) mod socket;
-pub(crate) mod split;
-pub(crate) mod split_owned;
-
-pub use split::{RecvHalf, SendHalf};
-pub use split_owned::{OwnedRecvHalf, OwnedSendHalf, ReuniteError};
diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs
index ba3a10c4..78baf279 100644
--- a/tokio/src/net/unix/datagram/socket.rs
+++ b/tokio/src/net/unix/datagram/socket.rs
@@ -1,7 +1,4 @@
-use crate::future::poll_fn;
use crate::io::PollEvented;
-use crate::net::unix::datagram::split::{split, RecvHalf, SendHalf};
-use crate::net::unix::datagram::split_owned::{split_owned, OwnedRecvHalf, OwnedSendHalf};
use std::convert::TryFrom;
use std::fmt;
@@ -10,7 +7,6 @@ use std::net::Shutdown;
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net::{self, SocketAddr};
use std::path::Path;
-use std::task::{Context, Poll};
cfg_uds! {
/// An I/O object representing a Unix datagram socket.
@@ -38,9 +34,9 @@ cfg_uds! {
///
/// // Bind each socket to a filesystem path
/// let tx_path = tmp.path().join("tx");
- /// let mut tx = UnixDatagram::bind(&tx_path)?;
+ /// let tx = UnixDatagram::bind(&tx_path)?;
/// let rx_path = tmp.path().join("rx");
- /// let mut rx = UnixDatagram::bind(&rx_path)?;
+ /// let rx = UnixDatagram::bind(&rx_path)?;
///
/// let bytes = b"hello world";
/// tx.send_to(bytes, &rx_path).await?;
@@ -64,7 +60,7 @@ cfg_uds! {
/// use tokio::net::UnixDatagram;
///
/// // Create the pair of sockets
- /// let (mut sock1, mut sock2) = UnixDatagram::pair()?;
+ /// let (sock1, sock2) = UnixDatagram::pair()?;
///
/// // Since the sockets are paired, the paired send/recv
/// // functions can be used
@@ -128,7 +124,7 @@ impl UnixDatagram {
/// use tokio::net::UnixDatagram;
///
/// // Create the pair of sockets
- /// let (mut sock1, mut sock2) = UnixDatagram::pair()?;
+ /// let (sock1, sock2) = UnixDatagram::pair()?;
///
/// // Since the sockets are paired, the paired send/recv
/// // functions can be used
@@ -208,12 +204,12 @@ impl UnixDatagram {
/// use tempfile::tempdir;
///
/// // Create an unbound socket
- /// let mut tx = UnixDatagram::unbound()?;
+ /// let tx = UnixDatagram::unbound()?;
///
/// // Create another, bound socket
/// let tmp = tempdir()?;
/// let rx_path = tmp.path().join("rx");
- /// let mut rx = UnixDatagram::bind(&rx_path)?;
+ /// let rx = UnixDatagram::bind(&rx_path)?;
///
/// // Send to the bound socket
/// let bytes = b"hello world";
@@ -247,12 +243,12 @@ impl UnixDatagram {
/// use tempfile::tempdir;
///
/// // Create an unbound socket
- /// let mut tx = UnixDatagram::unbound()?;
+ /// let tx = UnixDatagram::unbound()?;
///
/// // Create another, bound socket
/// let tmp = tempdir()?;
/// let rx_path = tmp.path().join("rx");
- /// let mut rx = UnixDatagram::bind(&rx_path)?;
+ /// let rx = UnixDatagram::bind(&rx_path)?;
///
/// // Connect to the bound socket
/// tx.connect(&rx_path)?;
@@ -284,7 +280,7 @@ impl UnixDatagram {
/// use tokio::net::UnixDatagram;
///
/// // Create the pair of sockets
- /// let (mut sock1, mut sock2) = UnixDatagram::pair()?;
+ /// let (sock1, sock2) = UnixDatagram::pair()?;
///
/// // Since the sockets are paired, the paired send/recv
/// // functions can be used
@@ -300,8 +296,10 @@ impl UnixDatagram {
/// # Ok(())
/// # }
/// ```
- pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
- poll_fn(|cx| self.poll_send_priv(cx, buf)).await
+ pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
+ self.io
+ .async_io(mio::Ready::writable(), |sock| sock.send(buf))
+ .await
}
/// Try to send a datagram to the peer without waiting.
@@ -371,32 +369,6 @@ impl UnixDatagram {
self.io.get_ref().send_to(buf, target)
}
- // Poll IO functions that takes `&self` are provided for the split API.
- //
- // They are not public because (taken from the doc of `PollEvented`):
- //
- // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the
- // caller must ensure that there are at most two tasks that use a
- // `PollEvented` instance concurrently. One for reading and one for writing.
- // While violating this requirement is "safe" from a Rust memory model point
- // of view, it will result in unexpected behavior in the form of lost
- // notifications and tasks hanging.
- pub(crate) fn poll_send_priv(
- &self,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_write_ready(cx))?;
-
- match self.io.get_ref().send(buf) {
- Err(ref e) if