diff options
author | Sean McArthur <sean@seanmonstar.com> | 2020-09-23 13:02:15 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-23 13:02:15 -0700 |
commit | a0557840eb424e174bf81a0175c40f9e176a2cc2 (patch) | |
tree | 676d33bf4144f0c0aac5af9f826ecc216a1d50e2 /tokio | |
parent | f25f12d57638a2928b3f738b3b1392d8773e276e (diff) |
io: use intrusive wait list for I/O driver (#2828)
This refactors I/O registration in a few ways:
- Cleans up the cached readiness in `PollEvented`. This cache used to
be helpful when readiness was a linked list of `*mut Node`s in
`Registration`. Previous refactors have turned `Registration` into just
an `AtomicUsize` holding the current readiness, so the cache is just
extra work and complexity. Gone.
- Polling the `Registration` for readiness now gives a `ReadyEvent`,
which includes the driver tick. This event must be passed back into
`clear_readiness`, so that the readiness is only cleared from `Registration`
if the tick hasn't changed. Previously, it was possible to clear the
readiness even though another thread had *just* polled the driver and
found the socket ready again.
- Registration now also contains an `async fn readiness`, which stores
wakers in an instrusive linked list. This allows an unbounded number
of tasks to register for readiness (previously, only 1 per direction (read
and write)). By using the intrusive linked list, there is no concern of
leaking the storage of the wakers, since they are stored inside the `async fn`
and released when the future is dropped.
- Registration retains a `poll_readiness(Direction)` method, to support
`AsyncRead` and `AsyncWrite`. They aren't able to use `async fn`s, and
so there are 2 reserved slots for those methods.
- IO types where it makes sense to have multiple tasks waiting on them
now take advantage of this new `async fn readiness`, such as `UdpSocket`
and `UnixDatagram`.
Additionally, this makes the `io-driver` "feature" internal-only (no longer
documented, not part of public API), and adds a second internal-only
feature, `io-readiness`, to group together linked list part of registration
that is only used by some of the IO types.
After a bit of discussion, changing stream-based transports (like
`TcpStream`) to have `async fn read(&self)` is punted, since that
is likely too easy of a footgun to activate.
Refs: #2779, #2728
Diffstat (limited to 'tokio')
29 files changed, 865 insertions, 1418 deletions
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index b1d943e3..6df368eb 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -33,7 +33,6 @@ full = [ "blocking", "dns", "fs", - "io-driver", "io-util", "io-std", "macros", @@ -51,7 +50,8 @@ full = [ blocking = ["rt-core"] dns = ["rt-core"] fs = ["rt-core", "io-util"] -io-driver = ["mio", "lazy_static"] +io-driver = ["mio", "lazy_static"] # internal only +io-readiness = [] # internal only io-util = ["memchr"] # stdin, stdout, stderr io-std = ["rt-core"] @@ -85,8 +85,8 @@ sync = ["fnv"] test-util = [] tcp = ["io-driver", "iovec"] time = ["slab"] -udp = ["io-driver"] -uds = ["io-driver", "mio-uds", "libc"] +udp = ["io-driver", "io-readiness"] +uds = ["io-driver", "io-readiness", "mio-uds", "libc"] [dependencies] tokio-macros = { version = "0.3.0", path = "../tokio-macros", optional = true } diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index d8c253ab..30b30203 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -12,14 +12,13 @@ use mio::event::Evented; use std::fmt; use std::io; use std::sync::{Arc, Weak}; -use std::task::Waker; use std::time::Duration; /// I/O driver, backed by Mio pub(crate) struct Driver { /// Tracks the number of times `turn` is called. It is safe for this to wrap /// as it is mostly used to determine when to call `compact()` - tick: u16, + tick: u8, /// Reuse the `mio::Events` value across calls to poll. events: Option<mio::Events>, @@ -40,6 +39,11 @@ pub(crate) struct Handle { inner: Weak<Inner>, } +pub(crate) struct ReadyEvent { + tick: u8, + readiness: mio::Ready, +} + pub(super) struct Inner { /// The underlying system event queue. io: mio::Poll, @@ -57,6 +61,11 @@ pub(super) enum Direction { Write, } +enum Tick { + Set(u8), + Clear(u8), +} + // TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup // token. const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31); @@ -122,11 +131,11 @@ impl Driver { fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> { // How often to call `compact()` on the resource slab - const COMPACT_INTERVAL: u16 = 256; + const COMPACT_INTERVAL: u8 = 255; self.tick = self.tick.wrapping_add(1); - if self.tick % COMPACT_INTERVAL == 0 { + if self.tick == COMPACT_INTERVAL { self.resources.compact(); } @@ -160,9 +169,6 @@ impl Driver { } fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) { - let mut rd = None; - let mut wr = None; - let addr = slab::Address::from_usize(ADDRESS.unpack(token.0)); let io = match self.resources.get(addr) { @@ -170,29 +176,15 @@ impl Driver { None => return, }; - if io - .set_readiness(Some(token.0), |curr| curr | ready.as_usize()) - .is_err() - { + let set = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| { + curr | ready.as_usize() + }); + if set.is_err() { // token no longer valid! return; } - if ready.is_writable() || platform::is_hup(ready) || platform::is_error(ready) { - wr = io.writer.take_waker(); - } - - if !(ready & (!mio::Ready::writable())).is_empty() { - rd = io.reader.take_waker(); - } - - if let Some(w) = rd { - w.wake(); - } - - if let Some(w) = wr { - w.wake(); - } + io.wake(ready); } } @@ -202,8 +194,7 @@ impl Drop for Driver { // If a task is waiting on the I/O resource, notify it. The task // will then attempt to use the I/O resource and fail due to the // driver being shutdown. - io.reader.wake(); - io.writer.wake(); + io.wake(mio::Ready::all()); }) } } @@ -310,16 +301,6 @@ impl Inner { pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> { self.io.deregister(source) } - - /// Registers interest in the I/O resource associated with `token`. - pub(super) fn register(&self, io: &slab::Ref<ScheduledIo>, dir: Direction, w: Waker) { - let waker = match dir { - Direction::Read => &io.reader, - Direction::Write => &io.writer, - }; - - waker.register(w); - } } impl Direction { diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index 566f7daf..48c56a19 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -1,8 +1,21 @@ -use crate::loom::future::AtomicWaker; +use super::{platform, Direction, ReadyEvent, Tick}; use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::Mutex; +use crate::util::bit; use crate::util::slab::Entry; use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; +use std::task::{Context, Poll, Waker}; + +cfg_io_readiness! { + use crate::util::linked_list::{self, LinkedList}; + + use std::cell::UnsafeCell; + use std::future::Future; + use std::marker::PhantomPinned; + use std::pin::Pin; + use std::ptr::NonNull; +} /// Stored in the I/O driver resource slab. #[derive(Debug)] @@ -10,19 +23,84 @@ pub(crate) struct ScheduledIo { /// Packs the resource's readiness with the resource's generation. readiness: AtomicUsize, - /// Task waiting on read readiness - pub(crate) reader: AtomicWaker, + waiters: Mutex<Waiters>, +} + +#[cfg(feature = "io-readiness")] +type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>; + +#[derive(Debug, Default)] +struct Waiters { + #[cfg(feature = "io-readiness")] + /// List of all current waiters + list: WaitList, + + /// Waker used for AsyncRead + reader: Option<Waker>, + + /// Waker used for AsyncWrite + writer: Option<Waker>, +} + +cfg_io_readiness! { + #[derive(Debug)] + struct Waiter { + pointers: linked_list::Pointers<Waiter>, + + /// The waker for this task + waker: Option<Waker>, + + /// The interest this waiter is waiting on + interest: mio::Ready, + + is_ready: bool, + + /// Should never be `!Unpin` + _p: PhantomPinned, + } + + /// Future returned by `readiness()` + struct Readiness<'a> { + scheduled_io: &'a ScheduledIo, + + state: State, + + /// Entry in the waiter `LinkedList`. + waiter: UnsafeCell<Waiter>, + } + + enum State { + Init, + Waiting, + Done, + } +} + +// The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness. +// +// | reserved | generation | driver tick | readinesss | +// |----------+------------+--------------+------------| +// | 1 bit | 7 bits + 8 bits + 16 bits | + +const READINESS: bit::Pack = bit::Pack::least_significant(16); + +const TICK: bit::Pack = READINESS.then(8); + +const GENERATION: bit::Pack = TICK.then(7); - /// Task waiting on write readiness - pub(crate) writer: AtomicWaker, +#[test] +fn test_generations_assert_same() { + assert_eq!(super::GENERATION, GENERATION); } +// ===== impl ScheduledIo ===== + impl Entry for ScheduledIo { fn reset(&self) { let state = self.readiness.load(Acquire); - let generation = super::GENERATION.unpack(state); - let next = super::GENERATION.pack_lossy(generation + 1, 0); + let generation = GENERATION.unpack(state); + let next = GENERATION.pack_lossy(generation + 1, 0); self.readiness.store(next, Release); } @@ -32,15 +110,14 @@ impl Default for ScheduledIo { fn default() -> ScheduledIo { ScheduledIo { readiness: AtomicUsize::new(0), - reader: AtomicWaker::new(), - writer: AtomicWaker::new(), + waiters: Mutex::new(Default::default()), } } } impl ScheduledIo { pub(crate) fn generation(&self) -> usize { - super::GENERATION.unpack(self.readiness.load(Acquire)) + GENERATION.unpack(self.readiness.load(Acquire)) } /// Sets the readiness on this `ScheduledIo` by invoking the given closure on @@ -48,6 +125,8 @@ impl ScheduledIo { /// /// # Arguments /// - `token`: the token for this `ScheduledIo`. + /// - `tick`: whether setting the tick or trying to clear readiness for a + /// specific tick. /// - `f`: a closure returning a new readiness value given the previous /// readiness. /// @@ -57,51 +136,330 @@ impl ScheduledIo { /// generation, then the corresponding IO resource has been removed and /// replaced with a new resource. In that case, this method returns `Err`. /// Otherwise, this returns the previous readiness. - pub(crate) fn set_readiness( + pub(super) fn set_readiness( &self, token: Option<usize>, + tick: Tick, f: impl Fn(usize) -> usize, ) -> Result<usize, ()> { let mut current = self.readiness.load(Acquire); loop { - let current_generation = super::GENERATION.unpack(current); + let current_generation = GENERATION.unpack(current); if let Some(token) = token { // Check that the generation for this access is still the // current one. - if super::GENERATION.unpack(token) != current_generation { + if GENERATION.unpack(token) != current_generation { return Err(()); } } - // Mask out the generation bits so that the modifying function - // doesn't see them. + // Mask out the tick/generation bits so that the modifying + // function doesn't see them. let current_readiness = current & mio::Ready::all().as_usize(); - let new = f(current_readiness); + let mut new = f(current_readiness); debug_assert!( - new <= super::ADDRESS.max_value(), - "new readiness value would overwrite generation bits!" + new <= READINESS.max_value(), + "new readiness value would overwrite tick/generation bits!" ); - match self.readiness.compare_exchange( - current, - super::GENERATION.pack(current_generation, new), - AcqRel, - Acquire, - ) { + match tick { + Tick::Set(t) => { + new = TICK.pack(t as usize, new); + } + Tick::Clear(t) => { + if TICK.unpack(current) as u8 != t { + // Trying to clear readiness with an old event! + return Err(()); + } + new = TICK.pack(t as usize, new); + } + } + + new = GENERATION.pack(current_generation, new); + + match self + .readiness + .compare_exchange(current, new, AcqRel, Acquire) + { Ok(_) => return Ok(current), // we lost the race, retry! Err(actual) => current = actual, } } } + + pub(super) fn wake(&self, ready: mio::Ready) { + let mut waiters = self.waiters.lock().unwrap(); + + // check for AsyncRead slot + if !(ready & (!mio::Ready::writable())).is_empty() { + if let Some(waker) = waiters.reader.take() { + waker.wake(); + } + } + + // check for AsyncWrite slot + if ready.is_writable() || platform::is_hup(ready) || platform::is_error(ready) { + if let Some(waker) = waiters.writer.take() { + waker.wake(); + } + } + + #[cfg(feature = "io-readiness")] + { + // check list of waiters + for waiter in waiters + .list + .drain_filter(|w| !(w.interest & ready).is_empty()) + { + let waiter = unsafe { &mut *waiter.as_ptr() }; + if let Some(waker) = waiter.waker.take() { + waiter.is_ready = true; + waker.wake(); + } + } + } + } + + /// Poll version of checking readiness for a certain direction. + /// + /// These are to support `AsyncRead` and `AsyncWrite` polling methods, + /// which cannot use the `async fn` version. This uses reserved reader + /// and writer slots. + pub(in crate::io) fn poll_readiness( + &self, + cx: &mut Context<'_>, + direction: Direction, + ) -> Poll<ReadyEvent> { + let curr = self.readiness.load(Acquire); + + let ready = direction.mask() & mio::Ready::from_usize(READINESS.unpack(curr)); + + if ready.is_empty() { + // Update the task info + let mut waiters = self.waiters.lock().unwrap(); + let slot = match direction { + Direction::Read => &mut waiters.reader, + Direction::Write => &mut waiters.writer, + }; + *slot = Some(cx.waker().clone()); + + // Try again, in case the readiness was changed while we were + // taking the waiters lock + let curr = self.readiness.load(Acquire); + let ready = direction.mask() & mio::Ready::from_usize(READINESS.unpack(curr)); + if ready.is_empty() { + Poll::Pending + } else { + Poll::Ready(ReadyEvent { + tick: TICK.unpack(curr) as u8, + readiness: ready, + }) + } + } else { + Poll::Ready(ReadyEvent { + tick: TICK.unpack(curr) as u8, + readiness: ready, + }) + } + } + + pub(crate) fn clear_readiness(&self, event: ReadyEvent) { + // This consumes the current readiness state **except** for HUP and + // error. HUP and error are excluded because a) they are final states + // and never transition out and b) both the read AND the write + // directions need to be able to obvserve these states. + // + // # Platform-specific behavior + // + // HUP and error readiness are platform-specific. On epoll platforms, + // HUP has specific conditions that must be met by both peers of a + // connection in order to be triggered. + // + // On epoll platforms, `EPOLLERR` is signaled through + // `UnixReady::error()` and is important to be observable by both read + // AND write. A specific case that `EPOLLERR` occurs is when the read + // end of a pipe is closed. When this occurs, a peer blocked by + // writing to the pipe should be notified. + let mask_no_hup = (event.readiness - platform::hup() - platform::error()).as_usize(); + + // result isn't important + let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr & (!mask_no_hup)); + } } impl Drop for ScheduledIo { fn drop(&mut self) { - self.writer.wake(); - self.reader.wake(); + self.wake(mio::Ready::all()); + } +} + +unsafe impl Send for ScheduledIo {} +unsafe impl Sync for ScheduledIo {} + +cfg_io_readiness! { + impl ScheduledIo { + /// An async version of `poll_readiness` which uses a linked list of wakers + pub(crate) async fn readiness(&self, interest: mio::Ready) -> ReadyEvent { + self.readiness_fut(interest).await + } + + // This is in a separate function so that the borrow checker doesn't think + // we are borrowing the `UnsafeCell` possibly over await boundaries. + // + // Go figure. + fn readiness_fut(&self, interest: mio::Ready) -> Readiness<'_> { + Readiness { + scheduled_io: self, + state: State::Init, + waiter: UnsafeCell::new(Waiter { + pointers: linked_list::Pointers::new(), + waker: None, + is_ready: false, + interest, + _p: PhantomPinned, + }), + } + } + } + + unsafe impl linked_list::Link for Waiter { + type Handle = NonNull<Waiter>; + type Target = Waiter; + + fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> { + *handle + } + + unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> { + ptr + } + + unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> { + NonNull::from(&mut target.as_mut().pointers) + } + } + + // ===== impl Readiness ===== + + impl Future for Readiness<'_> { + type Output = ReadyEvent; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + use std::sync::atomic::Ordering::SeqCst; + + let (scheduled_io, state, waiter) = unsafe { + let me = self.get_unchecked_mut(); + (&me.scheduled_io, &mut me.state, &me.waiter) + }; + + loop { + match *state { + State::Init => { + // Optimistically check existing readiness + let curr = scheduled_io.readiness.load(SeqCst); + let readiness = mio::Ready::from_usize(READINESS.unpack(curr)); + + // Safety: `waiter.interest` never changes + let interest = unsafe { (*waiter.get()).interest }; + + if readiness.contains(interest) { + // Currently ready! + let tick = TICK.unpack(curr) as u8; + *state = State::Done; + return Poll::Ready(ReadyEvent { readiness, tick }); + } + + // Wasn't ready, take the lock (and check again while locked). + let mut waiters = scheduled_io.waiters.lock().unwrap(); + + let curr = scheduled_io.readiness.load(SeqCst); + let readiness = mio::Ready::from_usize(READINESS.unpack(curr)); + + if readiness.contains(interest) { + // Currently ready! + let tick = TICK.unpack(curr) as u8; + *state = State::Done; + return Poll::Ready(ReadyEvent { readiness, tick }); + } + + // Not ready even after locked, insert into list... + + // Safety: called while locked + unsafe { + (*waiter.get()).waker = Some(cx.waker().clone()); + } + + // Insert the waiter into the linked list + // + // safety: pointers from `UnsafeCell` are never null. + waiters + .list + .push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); + *state = State::Waiting; + } + State::Waiting => { + // Currently in the "Waiting" state, implying the caller has + // a waiter stored in the waiter list (guarded by + // `notify.waiters`). In order to access the waker fields, + // we must hold the lock. + + let waiters = scheduled_io.waiters.lock().unwrap(); + + // Safety: called while locked + let w = unsafe { &mut *waiter.get() }; + + if w.is_ready { + // Our waker has been notified. + *state = State::Done; + } else { + // Update the waker, if necessary. + if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { + w.waker = Some(cx.waker().clone()); + } + + return Poll::Pending; + } + + // Explicit drop of the lock to indicate the scope that the + // lock is held. Because holding the lock is required to + // ensure safe access to fields not held within the lock, it + // is helpful to visualize the scope of the critical + // section. + drop(waiters); + } + State::Done => { + let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8; + + // Safety: State::Done means it is no longer shared + let w = unsafe { &mut *waiter.get() }; + + return Poll::Ready(ReadyEvent { + tick, + readiness: w.interest, + }); + } + } + } + } + } + + impl Drop for Readiness<'_> { + fn drop(&mut self) { + let mut waiters = self.scheduled_io.waiters.lock().unwrap(); + + // Safety: `waiter` is only ever stored in `waiters` + unsafe { + waiters + .list + .remove(NonNull::new_unchecked(self.waiter.get())) + }; + } } + + unsafe impl Send for Readiness<'_> {} + unsafe impl Sync for Readiness<'_> {} } diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index e81054bf..1ca3fe69 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -207,10 +207,10 @@ cfg_io_driver! { pub(crate) mod driver; mod poll_evented; - pub use poll_evented::PollEvented; + #[cfg(not(loom))] + pub(crate) use poll_evented::PollEvented; mod registration; - pub use registration::Registration; } cfg_io_std! { diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index 9054c3b8..2c943ea4 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -1,13 +1,12 @@ -use crate::io::driver::platform; -use crate::io::{AsyncRead, AsyncWrite, ReadBuf, Registration}; +use crate::io::driver::{Direction, Handle, ReadyEvent}; +use crate::io::registration::Registration; +use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; use mio::event::Evented; use std::fmt; use std::io::{self, Read, Write}; use std::marker::Unpin; use std::pin::Pin; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::Relaxed; use std::task::{Context, Poll}; cfg_io_driver! { @@ -53,37 +52,6 @@ cfg_io_driver! { /// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and /// [`clear_read_ready`]. /// - /// ```rust - /// use tokio::io::PollEvented; - /// - /// use futures::ready; - /// use mio::Ready; - /// use mio::net::{TcpStream, TcpListener}; - /// use std::io; - /// use std::task::{Context, Poll}; - /// - /// struct MyListener { - /// poll_evented: PollEvented<TcpListener>, - /// } - /// - /// impl MyListener { - /// pub fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll<Result<TcpStream, io::Error>> { - /// let ready = Ready::readable(); - /// - /// ready!(self.poll_evented.poll_read_ready(cx, ready))?; - /// - /// match self.poll_evented.get_ref().accept() { - /// Ok((socket, _)) => Poll::Ready(Ok(socket)), - /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - /// self.poll_evented.clear_read_ready(cx, ready)?; - /// Poll::Pending - /// } - /// Err(e) => Poll::Ready(Err(e)), - /// } - /// } - /// } - /// ``` - /// /// ## Platform-specific events /// /// `PollEvented` also allows receiving platform-specific `mio::Ready` events. @@ -101,66 +69,14 @@ cfg_io_driver! { /// [`clear_write_ready`]: method@Self::clear_write_ready /// [`poll_read_ready`]: method@Self::poll_read_ready /// [`poll_write_ready`]: method@Self::poll_write_ready - pub struct PollEvented<E: Evented> { + pub(crate) struct PollEvented<E: Evented> { io: Option<E>, - inner: Inner, + registration: Registration, } } -struct Inner { - registration: Registration, - - /// Currently visible read readiness - read_readiness: AtomicUsize, - - /// Currently visible write readiness - write_readiness: AtomicUsize, -} - // ===== impl PollEvented ===== -macro_rules! poll_ready { - ($me:expr, $mask:expr, $cache:ident, $take:ident, $poll:expr) => {{ - // Load cached & encoded readiness. - let mut cached = $me.inner.$cache.load(Relaxed); - let mask = $mask | platform::hup() | platform::error(); - - // See if the current readiness matches any bits. - let mut ret = mio::Ready::from_usize(cached) & $mask; - - if ret.is_empty() { - // Readiness does not match, consume the registration's readiness - // stream. This happens in a loop to ensure that the stream gets - // drained. - loop { - let ready = match $poll? { - Poll::Ready(v) => v, - Poll::Pending => return Poll::Pending, - }; - cached |= ready.as_usize(); - - // Update the cache store - $me.inner.$cache.store(cached, Relaxed); - - ret |= ready & mask; - - if !ret.is_empty() { - return Poll::Ready(Ok(ret)); - } - } - } else { - // Check what's new with the registration stream. This will not - // request to be notified - if let Some(ready) = $me.inner.registration.$take()? { - cached |= ready.as_usize(); - $me.inner.$cache.store(cached, Relaxed); - } - - Poll::Ready(Ok(mio::Ready::from_usize(cached))) - } - }}; -} - impl<E> PollEvented<E> where E: Evented, @@ -174,7 +90,8 @@ where /// The runtime is usually set implicitly when this |