summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSean McArthur <sean@seanmonstar.com>2020-09-23 13:02:15 -0700
committerGitHub <noreply@github.com>2020-09-23 13:02:15 -0700
commita0557840eb424e174bf81a0175c40f9e176a2cc2 (patch)
tree676d33bf4144f0c0aac5af9f826ecc216a1d50e2
parentf25f12d57638a2928b3f738b3b1392d8773e276e (diff)
io: use intrusive wait list for I/O driver (#2828)
This refactors I/O registration in a few ways: - Cleans up the cached readiness in `PollEvented`. This cache used to be helpful when readiness was a linked list of `*mut Node`s in `Registration`. Previous refactors have turned `Registration` into just an `AtomicUsize` holding the current readiness, so the cache is just extra work and complexity. Gone. - Polling the `Registration` for readiness now gives a `ReadyEvent`, which includes the driver tick. This event must be passed back into `clear_readiness`, so that the readiness is only cleared from `Registration` if the tick hasn't changed. Previously, it was possible to clear the readiness even though another thread had *just* polled the driver and found the socket ready again. - Registration now also contains an `async fn readiness`, which stores wakers in an instrusive linked list. This allows an unbounded number of tasks to register for readiness (previously, only 1 per direction (read and write)). By using the intrusive linked list, there is no concern of leaking the storage of the wakers, since they are stored inside the `async fn` and released when the future is dropped. - Registration retains a `poll_readiness(Direction)` method, to support `AsyncRead` and `AsyncWrite`. They aren't able to use `async fn`s, and so there are 2 reserved slots for those methods. - IO types where it makes sense to have multiple tasks waiting on them now take advantage of this new `async fn readiness`, such as `UdpSocket` and `UnixDatagram`. Additionally, this makes the `io-driver` "feature" internal-only (no longer documented, not part of public API), and adds a second internal-only feature, `io-readiness`, to group together linked list part of registration that is only used by some of the IO types. After a bit of discussion, changing stream-based transports (like `TcpStream`) to have `async fn read(&self)` is punted, since that is likely too easy of a footgun to activate. Refs: #2779, #2728
-rw-r--r--.github/workflows/ci.yml4
-rw-r--r--examples/connect.rs8
-rw-r--r--examples/echo-udp.rs2
-rw-r--r--examples/udp-client.rs2
-rw-r--r--examples/udp-codec.rs6
-rw-r--r--tokio-util/Cargo.toml3
-rw-r--r--tokio-util/src/cfg.rs2
-rw-r--r--tokio-util/src/lib.rs5
-rw-r--r--tokio-util/tests/udp.rs2
-rw-r--r--tokio/Cargo.toml8
-rw-r--r--tokio/src/io/driver/mod.rs57
-rw-r--r--tokio/src/io/driver/scheduled_io.rs410
-rw-r--r--tokio/src/io/mod.rs4
-rw-r--r--tokio/src/io/poll_evented.rs295
-rw-r--r--tokio/src/io/registration.rs237
-rw-r--r--tokio/src/lib.rs4
-rw-r--r--tokio/src/macros/cfg.rs10
-rw-r--r--tokio/src/net/tcp/listener.rs20
-rw-r--r--tokio/src/net/tcp/stream.rs254
-rw-r--r--tokio/src/net/udp/mod.rs4
-rw-r--r--tokio/src/net/udp/socket.rs116
-rw-r--r--tokio/src/net/udp/split.rs148
-rw-r--r--tokio/src/net/unix/datagram/mod.rs5
-rw-r--r--tokio/src/net/unix/datagram/socket.rs227
-rw-r--r--tokio/src/net/unix/datagram/split.rs68
-rw-r--r--tokio/src/net/unix/datagram/split_owned.rs148
-rw-r--r--tokio/src/net/unix/listener.rs30
-rw-r--r--tokio/src/net/unix/stream.rs54
-rw-r--r--tokio/src/signal/unix/driver.rs43
-rw-r--r--tokio/src/sync/task/atomic_waker.rs5
-rw-r--r--tokio/src/util/bit.rs2
-rw-r--r--tokio/src/util/linked_list.rs54
-rw-r--r--tokio/src/util/mod.rs2
-rw-r--r--tokio/src/util/slab.rs2
-rw-r--r--tokio/tests/async_send_sync.rs4
-rw-r--r--tokio/tests/rt_common.rs4
-rw-r--r--tokio/tests/udp.rs34
-rw-r--r--tokio/tests/uds_datagram.rs34
38 files changed, 888 insertions, 1429 deletions
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 756a7677..16da8c95 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -150,11 +150,11 @@ jobs:
run: cargo install cargo-hack
- name: check --each-feature
- run: cargo hack check --all --each-feature -Z avoid-dev-deps
+ run: cargo hack check --all --each-feature --skip io-driver,io-readiness -Z avoid-dev-deps
# Try with unstable feature flags
- name: check --each-feature --unstable
- run: cargo hack check --all --each-feature -Z avoid-dev-deps
+ run: cargo hack check --all --each-feature --skip io-driver,io-readiness -Z avoid-dev-deps
env:
RUSTFLAGS: --cfg tokio_unstable -Dwarnings
diff --git a/examples/connect.rs b/examples/connect.rs
index 5d0515a7..6e909b25 100644
--- a/examples/connect.rs
+++ b/examples/connect.rs
@@ -96,7 +96,6 @@ mod udp {
use std::error::Error;
use std::io;
use std::net::SocketAddr;
- use tokio::net::udp::{RecvHalf, SendHalf};
use tokio::net::UdpSocket;
pub async fn connect(
@@ -114,16 +113,15 @@ mod udp {
let socket = UdpSocket::bind(&bind_addr).await?;
socket.connect(addr).await?;
- let (mut r, mut w) = socket.split();
- future::try_join(send(stdin, &mut w), recv(stdout, &mut r)).await?;
+ future::try_join(send(stdin, &socket), recv(stdout, &socket)).await?;
Ok(())
}
async fn send(
mut stdin: impl Stream<Item = Result<Bytes, io::Error>> + Unpin,
- writer: &mut SendHalf,
+ writer: &UdpSocket,
) -> Result<(), io::Error> {
while let Some(item) = stdin.next().await {
let buf = item?;
@@ -135,7 +133,7 @@ mod udp {
async fn recv(
mut stdout: impl Sink<Bytes, Error = io::Error> + Unpin,
- reader: &mut RecvHalf,
+ reader: &UdpSocket,
) -> Result<(), io::Error> {
loop {
let mut buf = vec![0; 1024];
diff --git a/examples/echo-udp.rs b/examples/echo-udp.rs
index bc688b9b..3027c869 100644
--- a/examples/echo-udp.rs
+++ b/examples/echo-udp.rs
@@ -26,7 +26,7 @@ struct Server {
impl Server {
async fn run(self) -> Result<(), io::Error> {
let Server {
- mut socket,
+ socket,
mut buf,
mut to_send,
} = self;
diff --git a/examples/udp-client.rs b/examples/udp-client.rs
index a191033d..a394ee66 100644
--- a/examples/udp-client.rs
+++ b/examples/udp-client.rs
@@ -55,7 +55,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
.parse()?;
- let mut socket = UdpSocket::bind(local_addr).await?;
+ let socket = UdpSocket::bind(local_addr).await?;
const MAX_DATAGRAM_SIZE: usize = 65_507;
socket.connect(&remote_addr).await?;
let data = get_stdin_data()?;
diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs
index 8b64cbc3..f338bd84 100644
--- a/examples/udp-codec.rs
+++ b/examples/udp-codec.rs
@@ -1,3 +1,8 @@
+fn main() {}
+
+// Disabled while future of UdpFramed is decided on.
+// See https://github.com/tokio-rs/tokio/issues/2830
+/*
//! This example leverages `BytesCodec` to create a UDP client and server which
//! speak a custom protocol.
//!
@@ -78,3 +83,4 @@ async fn pong(socket: &mut UdpFramed<BytesCodec>) -> Result<(), io::Error> {
Ok(())
}
+*/
diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml
index 85b4e592..45daa2b1 100644
--- a/tokio-util/Cargo.toml
+++ b/tokio-util/Cargo.toml
@@ -25,11 +25,10 @@ publish = false
default = []
# Shorthand for enabling everything
-full = ["codec", "udp", "compat", "io"]
+full = ["codec", "compat", "io"]
compat = ["futures-io",]
codec = ["tokio/stream"]
-udp = ["tokio/udp"]
io = []
[dependencies]
diff --git a/tokio-util/src/cfg.rs b/tokio-util/src/cfg.rs
index 2efa5f09..f9176747 100644
--- a/tokio-util/src/cfg.rs
+++ b/tokio-util/src/cfg.rs
@@ -18,6 +18,7 @@ macro_rules! cfg_compat {
}
}
+/*
macro_rules! cfg_udp {
($($item:item)*) => {
$(
@@ -27,6 +28,7 @@ macro_rules! cfg_udp {
)*
}
}
+*/
macro_rules! cfg_io {
($($item:item)*) => {
diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs
index 24a8af95..b96d9044 100644
--- a/tokio-util/src/lib.rs
+++ b/tokio-util/src/lib.rs
@@ -30,9 +30,14 @@ cfg_codec! {
pub mod codec;
}
+/*
+Disabled due to removal of poll_ functions on UdpSocket.
+
+See https://github.com/tokio-rs/tokio/issues/2830
cfg_udp! {
pub mod udp;
}
+*/
cfg_compat! {
pub mod compat;
diff --git a/tokio-util/tests/udp.rs b/tokio-util/tests/udp.rs
index 4820ac72..99763854 100644
--- a/tokio-util/tests/udp.rs
+++ b/tokio-util/tests/udp.rs
@@ -1,3 +1,4 @@
+/*
#![warn(rust_2018_idioms)]
use tokio::{net::UdpSocket, stream::StreamExt};
@@ -100,3 +101,4 @@ async fn send_framed_lines_codec() -> std::io::Result<()> {
Ok(())
}
+*/
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml
index b1d943e3..6df368eb 100644
--- a/tokio/Cargo.toml
+++ b/tokio/Cargo.toml
@@ -33,7 +33,6 @@ full = [
"blocking",
"dns",
"fs",
- "io-driver",
"io-util",
"io-std",
"macros",
@@ -51,7 +50,8 @@ full = [
blocking = ["rt-core"]
dns = ["rt-core"]
fs = ["rt-core", "io-util"]
-io-driver = ["mio", "lazy_static"]
+io-driver = ["mio", "lazy_static"] # internal only
+io-readiness = [] # internal only
io-util = ["memchr"]
# stdin, stdout, stderr
io-std = ["rt-core"]
@@ -85,8 +85,8 @@ sync = ["fnv"]
test-util = []
tcp = ["io-driver", "iovec"]
time = ["slab"]
-udp = ["io-driver"]
-uds = ["io-driver", "mio-uds", "libc"]
+udp = ["io-driver", "io-readiness"]
+uds = ["io-driver", "io-readiness", "mio-uds", "libc"]
[dependencies]
tokio-macros = { version = "0.3.0", path = "../tokio-macros", optional = true }
diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs
index d8c253ab..30b30203 100644
--- a/tokio/src/io/driver/mod.rs
+++ b/tokio/src/io/driver/mod.rs
@@ -12,14 +12,13 @@ use mio::event::Evented;
use std::fmt;
use std::io;
use std::sync::{Arc, Weak};
-use std::task::Waker;
use std::time::Duration;
/// I/O driver, backed by Mio
pub(crate) struct Driver {
/// Tracks the number of times `turn` is called. It is safe for this to wrap
/// as it is mostly used to determine when to call `compact()`
- tick: u16,
+ tick: u8,
/// Reuse the `mio::Events` value across calls to poll.
events: Option<mio::Events>,
@@ -40,6 +39,11 @@ pub(crate) struct Handle {
inner: Weak<Inner>,
}
+pub(crate) struct ReadyEvent {
+ tick: u8,
+ readiness: mio::Ready,
+}
+
pub(super) struct Inner {
/// The underlying system event queue.
io: mio::Poll,
@@ -57,6 +61,11 @@ pub(super) enum Direction {
Write,
}
+enum Tick {
+ Set(u8),
+ Clear(u8),
+}
+
// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup
// token.
const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);
@@ -122,11 +131,11 @@ impl Driver {
fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
// How often to call `compact()` on the resource slab
- const COMPACT_INTERVAL: u16 = 256;
+ const COMPACT_INTERVAL: u8 = 255;
self.tick = self.tick.wrapping_add(1);
- if self.tick % COMPACT_INTERVAL == 0 {
+ if self.tick == COMPACT_INTERVAL {
self.resources.compact();
}
@@ -160,9 +169,6 @@ impl Driver {
}
fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) {
- let mut rd = None;
- let mut wr = None;
-
let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));
let io = match self.resources.get(addr) {
@@ -170,29 +176,15 @@ impl Driver {
None => return,
};
- if io
- .set_readiness(Some(token.0), |curr| curr | ready.as_usize())
- .is_err()
- {
+ let set = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| {
+ curr | ready.as_usize()
+ });
+ if set.is_err() {
// token no longer valid!
return;
}
- if ready.is_writable() || platform::is_hup(ready) || platform::is_error(ready) {
- wr = io.writer.take_waker();
- }
-
- if !(ready & (!mio::Ready::writable())).is_empty() {
- rd = io.reader.take_waker();
- }
-
- if let Some(w) = rd {
- w.wake();
- }
-
- if let Some(w) = wr {
- w.wake();
- }
+ io.wake(ready);
}
}
@@ -202,8 +194,7 @@ impl Drop for Driver {
// If a task is waiting on the I/O resource, notify it. The task
// will then attempt to use the I/O resource and fail due to the
// driver being shutdown.
- io.reader.wake();
- io.writer.wake();
+ io.wake(mio::Ready::all());
})
}
}
@@ -310,16 +301,6 @@ impl Inner {
pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> {
self.io.deregister(source)
}
-
- /// Registers interest in the I/O resource associated with `token`.
- pub(super) fn register(&self, io: &slab::Ref<ScheduledIo>, dir: Direction, w: Waker) {
- let waker = match dir {
- Direction::Read => &io.reader,
- Direction::Write => &io.writer,
- };
-
- waker.register(w);
- }
}
impl Direction {
diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs
index 566f7daf..48c56a19 100644
--- a/tokio/src/io/driver/scheduled_io.rs
+++ b/tokio/src/io/driver/scheduled_io.rs
@@ -1,8 +1,21 @@
-use crate::loom::future::AtomicWaker;
+use super::{platform, Direction, ReadyEvent, Tick};
use crate::loom::sync::atomic::AtomicUsize;
+use crate::loom::sync::Mutex;
+use crate::util::bit;
use crate::util::slab::Entry;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
+use std::task::{Context, Poll, Waker};
+
+cfg_io_readiness! {
+ use crate::util::linked_list::{self, LinkedList};
+
+ use std::cell::UnsafeCell;
+ use std::future::Future;
+ use std::marker::PhantomPinned;
+ use std::pin::Pin;
+ use std::ptr::NonNull;
+}
/// Stored in the I/O driver resource slab.
#[derive(Debug)]
@@ -10,19 +23,84 @@ pub(crate) struct ScheduledIo {
/// Packs the resource's readiness with the resource's generation.
readiness: AtomicUsize,
- /// Task waiting on read readiness
- pub(crate) reader: AtomicWaker,
+ waiters: Mutex<Waiters>,
+}
+
+#[cfg(feature = "io-readiness")]
+type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
+
+#[derive(Debug, Default)]
+struct Waiters {
+ #[cfg(feature = "io-readiness")]
+ /// List of all current waiters
+ list: WaitList,
+
+ /// Waker used for AsyncRead
+ reader: Option<Waker>,
+
+ /// Waker used for AsyncWrite
+ writer: Option<Waker>,
+}
+
+cfg_io_readiness! {
+ #[derive(Debug)]
+ struct Waiter {
+ pointers: linked_list::Pointers<Waiter>,
+
+ /// The waker for this task
+ waker: Option<Waker>,
+
+ /// The interest this waiter is waiting on
+ interest: mio::Ready,
+
+ is_ready: bool,
+
+ /// Should never be `!Unpin`
+ _p: PhantomPinned,
+ }
+
+ /// Future returned by `readiness()`
+ struct Readiness<'a> {
+ scheduled_io: &'a ScheduledIo,
+
+ state: State,
+
+ /// Entry in the waiter `LinkedList`.
+ waiter: UnsafeCell<Waiter>,
+ }
+
+ enum State {
+ Init,
+ Waiting,
+ Done,
+ }
+}
+
+// The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness.
+//
+// | reserved | generation | driver tick | readinesss |
+// |----------+------------+--------------+------------|
+// | 1 bit | 7 bits + 8 bits + 16 bits |
+
+const READINESS: bit::Pack = bit::Pack::least_significant(16);
+
+const TICK: bit::Pack = READINESS.then(8);
+
+const GENERATION: bit::Pack = TICK.then(7);
- /// Task waiting on write readiness
- pub(crate) writer: AtomicWaker,
+#[test]
+fn test_generations_assert_same() {
+ assert_eq!(super::GENERATION, GENERATION);
}
+// ===== impl ScheduledIo =====
+
impl Entry for ScheduledIo {
fn reset(&self) {
let state = self.readiness.load(Acquire);
- let generation = super::GENERATION.unpack(state);
- let next = super::GENERATION.pack_lossy(generation + 1, 0);
+ let generation = GENERATION.unpack(state);
+ let next = GENERATION.pack_lossy(generation + 1, 0);
self.readiness.store(next, Release);
}
@@ -32,15 +110,14 @@ impl Default for ScheduledIo {
fn default() -> ScheduledIo {
ScheduledIo {
readiness: AtomicUsize::new(0),
- reader: AtomicWaker::new(),
- writer: AtomicWaker::new(),
+ waiters: Mutex::new(Default::default()),
}
}
}
impl ScheduledIo {
pub(crate) fn generation(&self) -> usize {
- super::GENERATION.unpack(self.readiness.load(Acquire))
+ GENERATION.unpack(self.readiness.load(Acquire))
}
/// Sets the readiness on this `ScheduledIo` by invoking the given closure on
@@ -48,6 +125,8 @@ impl ScheduledIo {
///
/// # Arguments
/// - `token`: the token for this `ScheduledIo`.
+ /// - `tick`: whether setting the tick or trying to clear readiness for a
+ /// specific tick.
/// - `f`: a closure returning a new readiness value given the previous
/// readiness.
///
@@ -57,51 +136,330 @@ impl ScheduledIo {
/// generation, then the corresponding IO resource has been removed and
/// replaced with a new resource. In that case, this method returns `Err`.
/// Otherwise, this returns the previous readiness.
- pub(crate) fn set_readiness(
+ pub(super) fn set_readiness(
&self,
token: Option<usize>,
+ tick: Tick,
f: impl Fn(usize) -> usize,
) -> Result<usize, ()> {
let mut current = self.readiness.load(Acquire);
loop {
- let current_generation = super::GENERATION.unpack(current);
+ let current_generation = GENERATION.unpack(current);
if let Some(token) = token {
// Check that the generation for this access is still the
// current one.
- if super::GENERATION.unpack(token) != current_generation {
+ if GENERATION.unpack(token) != current_generation {
return Err(());
}
}
- // Mask out the generation bits so that the modifying function
- // doesn't see them.
+ // Mask out the tick/generation bits so that the modifying
+ // function doesn't see them.
let current_readiness = current & mio::Ready::all().as_usize();
- let new = f(current_readiness);
+ let mut new = f(current_readiness);
debug_assert!(
- new <= super::ADDRESS.max_value(),
- "new readiness value would overwrite generation bits!"
+ new <= READINESS.max_value(),
+ "new readiness value would overwrite tick/generation bits!"
);
- match self.readiness.compare_exchange(
- current,
- super::GENERATION.pack(current_generation, new),
- AcqRel,
- Acquire,
- ) {
+ match tick {
+ Tick::Set(t) => {
+ new = TICK.pack(t as usize, new);
+ }
+ Tick::Clear(t) => {
+ if TICK.unpack(current) as u8 != t {
+ // Trying to clear readiness with an old event!
+ return Err(());
+ }
+ new = TICK.pack(t as usize, new);
+ }
+ }
+
+ new = GENERATION.pack(current_generation, new);
+
+ match self
+ .readiness
+ .compare_exchange(current, new, AcqRel, Acquire)
+ {
Ok(_) => return Ok(current),
// we lost the race, retry!
Err(actual) => current = actual,
}
}
}
+
+ pub(super) fn wake(&self, ready: mio::Ready) {
+ let mut waiters = self.waiters.lock().unwrap();
+
+ // check for AsyncRead slot
+ if !(ready & (!mio::Ready::writable())).is_empty() {
+ if let Some(waker) = waiters.reader.take() {
+ waker.wake();
+ }
+ }
+
+ // check for AsyncWrite slot
+ if ready.is_writable() || platform::is_hup(ready) || platform::is_error(ready) {
+ if let Some(waker) = waiters.writer.take() {
+ waker.wake();
+ }
+ }
+
+ #[cfg(feature = "io-readiness")]
+ {
+ // check list of waiters
+ for waiter in waiters
+ .list
+ .drain_filter(|w| !(w.interest & ready).is_empty())
+ {
+ let waiter = unsafe { &mut *waiter.as_ptr() };
+ if let Some(waker) = waiter.waker.take() {
+ waiter.is_ready = true;
+ waker.wake();
+ }
+ }
+ }
+ }
+
+ /// Poll version of checking readiness for a certain direction.
+ ///
+ /// These are to support `AsyncRead` and `AsyncWrite` polling methods,
+ /// which cannot use the `async fn` version. This uses reserved reader
+ /// and writer slots.
+ pub(in crate::io) fn poll_readiness(
+ &self,
+ cx: &mut Context<'_>,
+ direction: Direction,
+ ) -> Poll<ReadyEvent> {
+ let curr = self.readiness.load(Acquire);
+
+ let ready = direction.mask() & mio::Ready::from_usize(READINESS.unpack(curr));
+
+ if ready.is_empty() {
+ // Update the task info
+ let mut waiters = self.waiters.lock().unwrap();
+ let slot = match direction {
+ Direction::Read => &mut waiters.reader,
+ Direction::Write => &mut waiters.writer,
+ };
+ *slot = Some(cx.waker().clone());
+
+ // Try again, in case the readiness was changed while we were
+ // taking the waiters lock
+ let curr = self.readiness.load(Acquire);
+ let ready = direction.mask() & mio::Ready::from_usize(READINESS.unpack(curr));
+ if ready.is_empty() {
+ Poll::Pending
+ } else {
+ Poll::Ready(ReadyEvent {
+ tick: TICK.unpack(curr) as u8,
+ readiness: ready,
+ })
+ }
+ } else {
+ Poll::Ready(ReadyEvent {
+ tick: TICK.unpack(curr) as u8,
+ readiness: ready,
+ })
+ }
+ }
+
+ pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
+ // This consumes the current readiness state **except** for HUP and
+ // error. HUP and error are excluded because a) they are final states
+ // and never transition out and b) both the read AND the write
+ // directions need to be able to obvserve these states.
+ //
+ // # Platform-specific behavior
+ //
+ // HUP and error readiness are platform-specific. On epoll platforms,
+ // HUP has specific conditions that must be met by both peers of a
+ // connection in order to be triggered.
+ //
+ // On epoll platforms, `EPOLLERR` is signaled through
+ // `UnixReady::error()` and is important to be observable by both read
+ // AND write. A specific case that `EPOLLERR` occurs is when the read
+ // end of a pipe is closed. When this occurs, a peer blocked by
+ // writing to the pipe should be notified.
+ let mask_no_hup = (event.readiness - platform::hup() - platform::error()).as_usize();
+
+ // result isn't important
+ let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr & (!mask_no_hup));
+ }
}
impl Drop for ScheduledIo {
fn drop(&mut self) {
- self.writer.wake();
- self.reader.wake();
+ self.wake(mio::Ready::all());
+ }
+}
+
+unsafe impl Send for ScheduledIo {}
+unsafe impl Sync for ScheduledIo {}
+
+cfg_io_readiness! {
+ impl ScheduledIo {
+ /// An async version of `poll_readiness` which uses a linked list of wakers
+ pub(crate) async fn readiness(&self, interest: mio::Ready) -> ReadyEvent {
+ self.readiness_fut(interest).await
+ }
+
+ // This is in a separate function so that the borrow checker doesn't think
+ // we are borrowing the `UnsafeCell` possibly over await boundaries.
+ //
+ // Go figure.
+ fn readiness_fut(&self, interest: mio::Ready) -> Readiness<'_> {
+ Readiness {
+ scheduled_io: self,
+ state: State::Init,
+ waiter: UnsafeCell::new(Waiter {
+ pointers: linked_list::Pointers::new(),
+ waker: None,
+ is_ready: false,
+ interest,
+ _p: PhantomPinned,
+ }),
+ }
+ }
+ }
+
+ unsafe impl linked_list::Link for Waiter {
+ type Handle = NonNull<Waiter>;
+ type Target = Waiter;
+
+ fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
+ *handle
+ }
+
+ unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
+ ptr
+ }
+
+ unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
+ NonNull::from(&mut target.as_mut().pointers)
+ }
+ }
+
+ // ===== impl Readiness =====
+
+ impl Future for Readiness<'_> {
+ type Output = ReadyEvent;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ use std::sync::atomic::Ordering::SeqCst;
+
+ let (scheduled_io, state, waiter) = unsafe {
+ let me = self.get_unchecked_mut();
+ (&me.scheduled_io, &mut me.state, &me.waiter)
+ };
+
+ loop {
+ match *state {
+ State::Init => {
+ // Optimistically check existing readiness
+ let curr = scheduled_io.readiness.load(SeqCst);
+ let readiness = mio::Ready::from_usize(READINESS.unpack(curr));
+
+ // Safety: `waiter.interest` never changes
+ let interest = unsafe { (*waiter.get()).interest };
+
+ if readiness.contains(interest) {
+ // Currently ready!
+ let tick = TICK.unpack(curr) as u8;
+ *state = State::Done;
+ return Poll::Ready(ReadyEvent { readiness, tick });
+ }
+
+ // Wasn't ready, take the lock (and check again while locked).
+ let mut waiters = scheduled_io.waiters.lock().unwrap();
+
+ let curr = scheduled_io.readiness.load(SeqCst);
+ let readiness = mio::Ready::from_usize(READINESS.unpack(curr));
+
+ if readiness.contains(interest) {
+ // Currently ready!
+ let tick = TICK.unpack(curr) as u8;
+ *state = State::Done;
+ return Poll::Ready(ReadyEvent { readiness, tick });
+