summaryrefslogtreecommitdiffstats
path: root/tokio/src/net/tcp
diff options
context:
space:
mode:
authorSean McArthur <sean@seanmonstar.com>2020-09-23 13:02:15 -0700
committerGitHub <noreply@github.com>2020-09-23 13:02:15 -0700
commita0557840eb424e174bf81a0175c40f9e176a2cc2 (patch)
tree676d33bf4144f0c0aac5af9f826ecc216a1d50e2 /tokio/src/net/tcp
parentf25f12d57638a2928b3f738b3b1392d8773e276e (diff)
io: use intrusive wait list for I/O driver (#2828)
This refactors I/O registration in a few ways: - Cleans up the cached readiness in `PollEvented`. This cache used to be helpful when readiness was a linked list of `*mut Node`s in `Registration`. Previous refactors have turned `Registration` into just an `AtomicUsize` holding the current readiness, so the cache is just extra work and complexity. Gone. - Polling the `Registration` for readiness now gives a `ReadyEvent`, which includes the driver tick. This event must be passed back into `clear_readiness`, so that the readiness is only cleared from `Registration` if the tick hasn't changed. Previously, it was possible to clear the readiness even though another thread had *just* polled the driver and found the socket ready again. - Registration now also contains an `async fn readiness`, which stores wakers in an instrusive linked list. This allows an unbounded number of tasks to register for readiness (previously, only 1 per direction (read and write)). By using the intrusive linked list, there is no concern of leaking the storage of the wakers, since they are stored inside the `async fn` and released when the future is dropped. - Registration retains a `poll_readiness(Direction)` method, to support `AsyncRead` and `AsyncWrite`. They aren't able to use `async fn`s, and so there are 2 reserved slots for those methods. - IO types where it makes sense to have multiple tasks waiting on them now take advantage of this new `async fn readiness`, such as `UdpSocket` and `UnixDatagram`. Additionally, this makes the `io-driver` "feature" internal-only (no longer documented, not part of public API), and adds a second internal-only feature, `io-readiness`, to group together linked list part of registration that is only used by some of the IO types. After a bit of discussion, changing stream-based transports (like `TcpStream`) to have `async fn read(&self)` is punted, since that is likely too easy of a footgun to activate. Refs: #2779, #2728
Diffstat (limited to 'tokio/src/net/tcp')
-rw-r--r--tokio/src/net/tcp/listener.rs20
-rw-r--r--tokio/src/net/tcp/stream.rs254
2 files changed, 135 insertions, 139 deletions
diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs
index 0d7bbdbb..323b8bca 100644
--- a/tokio/src/net/tcp/listener.rs
+++ b/tokio/src/net/tcp/listener.rs
@@ -205,15 +205,16 @@ impl TcpListener {
&mut self,
cx: &mut Context<'_>,
) -> Poll<io::Result<(net::TcpStream, SocketAddr)>> {
- ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
+ loop {
+ let ev = ready!(self.io.poll_read_ready(cx))?;
- match self.io.get_ref().accept_std() {
- Ok(pair) => Poll::Ready(Ok(pair)),
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(cx, mio::Ready::readable())?;
- Poll::Pending
+ match self.io.get_ref().accept_std() {
+ Ok(pair) => return Poll::Ready(Ok(pair)),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ Err(e) => return Poll::Ready(Err(e)),
}
- Err(e) => Poll::Ready(Err(e)),
}
}
@@ -411,11 +412,6 @@ impl TryFrom<TcpListener> for mio::net::TcpListener {
type Error = io::Error;
/// Consumes value, returning the mio I/O object.
- ///
- /// See [`PollEvented::into_inner`] for more details about
- /// resource deregistration that happens during the call.
- ///
- /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
fn try_from(value: TcpListener) -> Result<Self, Self::Error> {
value.io.into_inner()
}
diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs
index e0348724..f4f705b4 100644
--- a/tokio/src/net/tcp/stream.rs
+++ b/tokio/src/net/tcp/stream.rs
@@ -299,15 +299,16 @@ impl TcpStream {
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
+ loop {
+ let ev = ready!(self.io.poll_read_ready(cx))?;
- match self.io.get_ref().peek(buf) {
- Ok(ret) => Poll::Ready(Ok(ret)),
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(cx, mio::Ready::readable())?;
- Poll::Pending
+ match self.io.get_ref().peek(buf) {
+ Ok(ret) => return Poll::Ready(Ok(ret)),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ Err(e) => return Poll::Ready(Err(e)),
}
- Err(e) => Poll::Ready(Err(e)),
}
}
@@ -703,26 +704,28 @@ impl TcpStream {
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
- ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
-
- // Safety: `TcpStream::read` will not peak at the maybe uinitialized bytes.
- let b =
- unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
- match self.io.get_ref().read(b) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_read_ready(cx, mio::Ready::readable())?;
- Poll::Pending
- }
- Ok(n) => {
- // Safety: We trust `TcpStream::read` to have filled up `n` bytes
- // in the buffer.
- unsafe {
- buf.assume_init(n);
+ loop {
+ let ev = ready!(self.io.poll_read_ready(cx))?;
+
+ // Safety: `TcpStream::read` will not peek at the maybe uinitialized bytes.
+ let b = unsafe {
+ &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
+ };
+ match self.io.get_ref().read(b) {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ Ok(n) => {
+ // Safety: We trust `TcpStream::read` to have filled up `n` bytes
+ // in the buffer.
+ unsafe {
+ buf.assume_init(n);
+ }
+ buf.add_filled(n);
+ return Poll::Ready(Ok(()));
}
- buf.add_filled(n);
- Poll::Ready(Ok(()))
+ Err(e) => return Poll::Ready(Err(e)),
}
- Err(e) => Poll::Ready(Err(e)),
}
}
@@ -731,14 +734,15 @@ impl TcpStream {
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
- ready!(self.io.poll_write_ready(cx))?;
+ loop {
+ let ev = ready!(self.io.poll_write_ready(cx))?;
- match self.io.get_ref().write(buf) {
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_write_ready(cx)?;
- Poll::Pending
+ match self.io.get_ref().write(buf) {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ x => return Poll::Ready(x),
}
- x => Poll::Ready(x),
}
}
@@ -749,99 +753,100 @@ impl TcpStream {
) -> Poll<io::Result<usize>> {
use std::io::IoSlice;
- ready!(self.io.poll_write_ready(cx))?;
-
- // The `IoVec` (v0.1.x) type can't have a zero-length size, so create
- // a dummy version from a 1-length slice which we'll overwrite with
- // the `bytes_vectored` method.
- static S: &[u8] = &[0];
- const MAX_BUFS: usize = 64;
-
- // IoSlice isn't Copy, so we must expand this manually ;_;
- let mut slices: [IoSlice<'_>; MAX_BUFS] = [
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- IoSlice::new(S),
- ];
- let cnt = buf.bytes_vectored(&mut slices);
-
- let iovec = <&IoVec>::from(S);
- let mut vecs = [iovec; MAX_BUFS];
- for i in 0..cnt {
- vecs[i] = (*slices[i]).into();
- }
-
- match self.io.get_ref().write_bufs(&vecs[..cnt]) {
- Ok(n) => {
- buf.advance(n);
- Poll::Ready(Ok(n))
+ loop {
+ let ev = ready!(self.io.poll_write_ready(cx))?;
+
+ // The `IoVec` (v0.1.x) type can't have a zero-length size, so create
+ // a dummy version from a 1-length slice which we'll overwrite with
+ // the `bytes_vectored` method.
+ static S: &[u8] = &[0];
+ const MAX_BUFS: usize = 64;
+
+ // IoSlice isn't Copy, so we must expand this manually ;_;
+ let mut slices: [IoSlice<'_>; MAX_BUFS] = [
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ IoSlice::new(S),
+ ];
+ let cnt = buf.bytes_vectored(&mut slices);
+
+ let iovec = <&IoVec>::from(S);
+ let mut vecs = [iovec; MAX_BUFS];
+ for i in 0..cnt {
+ vecs[i] = (*slices[i]).into();
}
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- self.io.clear_write_ready(cx)?;
- Poll::Pending
+
+ match self.io.get_ref().write_bufs(&vecs[..cnt]) {
+ Ok(n) => {
+ buf.advance(n);
+ return Poll::Ready(Ok(n));
+ }
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_readiness(ev);
+ }
+ Err(e) => return Poll::Ready(Err(e)),
}
- Err(e) => Poll::Ready(Err(e)),
}
}
}
@@ -850,11 +855,6 @@ impl TryFrom<TcpStream> for mio::net::TcpStream {
type Error = io::Error;
/// Consumes value, returning the mio I/O object.
- ///
- /// See [`PollEvented::into_inner`] for more details about
- /// resource deregistration that happens during the call.
- ///
- /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
fn try_from(value: TcpStream) -> Result<Self, Self::Error> {
value.io.into_inner()
}