diff options
author | Sean McArthur <sean@seanmonstar.com> | 2020-09-23 13:02:15 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-23 13:02:15 -0700 |
commit | a0557840eb424e174bf81a0175c40f9e176a2cc2 (patch) | |
tree | 676d33bf4144f0c0aac5af9f826ecc216a1d50e2 /tokio/src/net/tcp | |
parent | f25f12d57638a2928b3f738b3b1392d8773e276e (diff) |
io: use intrusive wait list for I/O driver (#2828)
This refactors I/O registration in a few ways:
- Cleans up the cached readiness in `PollEvented`. This cache used to
be helpful when readiness was a linked list of `*mut Node`s in
`Registration`. Previous refactors have turned `Registration` into just
an `AtomicUsize` holding the current readiness, so the cache is just
extra work and complexity. Gone.
- Polling the `Registration` for readiness now gives a `ReadyEvent`,
which includes the driver tick. This event must be passed back into
`clear_readiness`, so that the readiness is only cleared from `Registration`
if the tick hasn't changed. Previously, it was possible to clear the
readiness even though another thread had *just* polled the driver and
found the socket ready again.
- Registration now also contains an `async fn readiness`, which stores
wakers in an instrusive linked list. This allows an unbounded number
of tasks to register for readiness (previously, only 1 per direction (read
and write)). By using the intrusive linked list, there is no concern of
leaking the storage of the wakers, since they are stored inside the `async fn`
and released when the future is dropped.
- Registration retains a `poll_readiness(Direction)` method, to support
`AsyncRead` and `AsyncWrite`. They aren't able to use `async fn`s, and
so there are 2 reserved slots for those methods.
- IO types where it makes sense to have multiple tasks waiting on them
now take advantage of this new `async fn readiness`, such as `UdpSocket`
and `UnixDatagram`.
Additionally, this makes the `io-driver` "feature" internal-only (no longer
documented, not part of public API), and adds a second internal-only
feature, `io-readiness`, to group together linked list part of registration
that is only used by some of the IO types.
After a bit of discussion, changing stream-based transports (like
`TcpStream`) to have `async fn read(&self)` is punted, since that
is likely too easy of a footgun to activate.
Refs: #2779, #2728
Diffstat (limited to 'tokio/src/net/tcp')
-rw-r--r-- | tokio/src/net/tcp/listener.rs | 20 | ||||
-rw-r--r-- | tokio/src/net/tcp/stream.rs | 254 |
2 files changed, 135 insertions, 139 deletions
diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs index 0d7bbdbb..323b8bca 100644 --- a/tokio/src/net/tcp/listener.rs +++ b/tokio/src/net/tcp/listener.rs @@ -205,15 +205,16 @@ impl TcpListener { &mut self, cx: &mut Context<'_>, ) -> Poll<io::Result<(net::TcpStream, SocketAddr)>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; + loop { + let ev = ready!(self.io.poll_read_ready(cx))?; - match self.io.get_ref().accept_std() { - Ok(pair) => Poll::Ready(Ok(pair)), - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending + match self.io.get_ref().accept_std() { + Ok(pair) => return Poll::Ready(Ok(pair)), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + Err(e) => return Poll::Ready(Err(e)), } - Err(e) => Poll::Ready(Err(e)), } } @@ -411,11 +412,6 @@ impl TryFrom<TcpListener> for mio::net::TcpListener { type Error = io::Error; /// Consumes value, returning the mio I/O object. - /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. - /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner fn try_from(value: TcpListener) -> Result<Self, Self::Error> { value.io.into_inner() } diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index e0348724..f4f705b4 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -299,15 +299,16 @@ impl TcpStream { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; + loop { + let ev = ready!(self.io.poll_read_ready(cx))?; - match self.io.get_ref().peek(buf) { - Ok(ret) => Poll::Ready(Ok(ret)), - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending + match self.io.get_ref().peek(buf) { + Ok(ret) => return Poll::Ready(Ok(ret)), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + Err(e) => return Poll::Ready(Err(e)), } - Err(e) => Poll::Ready(Err(e)), } } @@ -703,26 +704,28 @@ impl TcpStream { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>> { - ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; - - // Safety: `TcpStream::read` will not peak at the maybe uinitialized bytes. - let b = - unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; - match self.io.get_ref().read(b) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_read_ready(cx, mio::Ready::readable())?; - Poll::Pending - } - Ok(n) => { - // Safety: We trust `TcpStream::read` to have filled up `n` bytes - // in the buffer. - unsafe { - buf.assume_init(n); + loop { + let ev = ready!(self.io.poll_read_ready(cx))?; + + // Safety: `TcpStream::read` will not peek at the maybe uinitialized bytes. + let b = unsafe { + &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) + }; + match self.io.get_ref().read(b) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + Ok(n) => { + // Safety: We trust `TcpStream::read` to have filled up `n` bytes + // in the buffer. + unsafe { + buf.assume_init(n); + } + buf.add_filled(n); + return Poll::Ready(Ok(())); } - buf.add_filled(n); - Poll::Ready(Ok(())) + Err(e) => return Poll::Ready(Err(e)), } - Err(e) => Poll::Ready(Err(e)), } } @@ -731,14 +734,15 @@ impl TcpStream { cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>> { - ready!(self.io.poll_write_ready(cx))?; + loop { + let ev = ready!(self.io.poll_write_ready(cx))?; - match self.io.get_ref().write(buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending + match self.io.get_ref().write(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + x => return Poll::Ready(x), } - x => Poll::Ready(x), } } @@ -749,99 +753,100 @@ impl TcpStream { ) -> Poll<io::Result<usize>> { use std::io::IoSlice; - ready!(self.io.poll_write_ready(cx))?; - - // The `IoVec` (v0.1.x) type can't have a zero-length size, so create - // a dummy version from a 1-length slice which we'll overwrite with - // the `bytes_vectored` method. - static S: &[u8] = &[0]; - const MAX_BUFS: usize = 64; - - // IoSlice isn't Copy, so we must expand this manually ;_; - let mut slices: [IoSlice<'_>; MAX_BUFS] = [ - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - IoSlice::new(S), - ]; - let cnt = buf.bytes_vectored(&mut slices); - - let iovec = <&IoVec>::from(S); - let mut vecs = [iovec; MAX_BUFS]; - for i in 0..cnt { - vecs[i] = (*slices[i]).into(); - } - - match self.io.get_ref().write_bufs(&vecs[..cnt]) { - Ok(n) => { - buf.advance(n); - Poll::Ready(Ok(n)) + loop { + let ev = ready!(self.io.poll_write_ready(cx))?; + + // The `IoVec` (v0.1.x) type can't have a zero-length size, so create + // a dummy version from a 1-length slice which we'll overwrite with + // the `bytes_vectored` method. + static S: &[u8] = &[0]; + const MAX_BUFS: usize = 64; + + // IoSlice isn't Copy, so we must expand this manually ;_; + let mut slices: [IoSlice<'_>; MAX_BUFS] = [ + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + ]; + let cnt = buf.bytes_vectored(&mut slices); + + let iovec = <&IoVec>::from(S); + let mut vecs = [iovec; MAX_BUFS]; + for i in 0..cnt { + vecs[i] = (*slices[i]).into(); } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_write_ready(cx)?; - Poll::Pending + + match self.io.get_ref().write_bufs(&vecs[..cnt]) { + Ok(n) => { + buf.advance(n); + return Poll::Ready(Ok(n)); + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_readiness(ev); + } + Err(e) => return Poll::Ready(Err(e)), } - Err(e) => Poll::Ready(Err(e)), } } } @@ -850,11 +855,6 @@ impl TryFrom<TcpStream> for mio::net::TcpStream { type Error = io::Error; /// Consumes value, returning the mio I/O object. - /// - /// See [`PollEvented::into_inner`] for more details about - /// resource deregistration that happens during the call. - /// - /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner fn try_from(value: TcpStream) -> Result<Self, Self::Error> { value.io.into_inner() } |