summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2020-10-02 13:54:00 -0700
committerGitHub <noreply@github.com>2020-10-02 13:54:00 -0700
commit1e585ccb516c8dc7c13cbc3d50f8ca49260b9617 (patch)
tree00959b4ac82e4972314baa043cdbca2f2ebf5848
parent7ec6d88b21ea3e5531176f526a51dae0a4513025 (diff)
io: update to Mio 0.7 (#2893)
This also makes Mio an implementation detail, removing it from the public API. This is based on #1767.
-rw-r--r--tokio/Cargo.toml22
-rw-r--r--tokio/src/io/driver/mod.rs83
-rw-r--r--tokio/src/io/driver/ready.rs187
-rw-r--r--tokio/src/io/driver/scheduled_io.rs97
-rw-r--r--tokio/src/io/poll_evented.rs80
-rw-r--r--tokio/src/io/registration.rs37
-rw-r--r--tokio/src/net/tcp/listener.rs32
-rw-r--r--tokio/src/net/tcp/stream.rs239
-rw-r--r--tokio/src/net/udp/socket.rs33
-rw-r--r--tokio/src/net/unix/datagram/socket.rs50
-rw-r--r--tokio/src/net/unix/listener.rs44
-rw-r--r--tokio/src/net/unix/mod.rs3
-rw-r--r--tokio/src/net/unix/socketaddr.rs31
-rw-r--r--tokio/src/net/unix/stream.rs26
-rw-r--r--tokio/src/process/unix/mod.rs33
-rw-r--r--tokio/src/process/windows.rs2
-rw-r--r--tokio/src/signal/unix.rs2
-rw-r--r--tokio/src/signal/unix/driver.rs36
-rw-r--r--tokio/tests/uds_datagram.rs27
19 files changed, 476 insertions, 588 deletions
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml
index 0b201795..fdce440f 100644
--- a/tokio/Cargo.toml
+++ b/tokio/Cargo.toml
@@ -58,9 +58,9 @@ net = ["dns", "tcp", "udp", "uds"]
process = [
"lazy_static",
"libc",
- "mio",
- "mio-named-pipes",
- "mio-uds",
+ "mio/os-poll",
+ "mio/os-util",
+ "mio/uds",
"signal-hook-registry",
"winapi/threadpoollegacyapiset",
]
@@ -74,18 +74,18 @@ rt-threaded = [
signal = [
"lazy_static",
"libc",
- "mio",
- "mio-uds",
+ "mio/os-poll",
+ "mio/uds",
"signal-hook-registry",
"winapi/consoleapi",
]
stream = ["futures-core"]
sync = ["fnv"]
test-util = []
-tcp = ["lazy_static", "mio"]
+tcp = ["lazy_static", "mio/tcp", "mio/os-poll"]
time = ["slab"]
-udp = ["lazy_static", "mio"]
-uds = ["lazy_static", "libc", "mio", "mio-uds"]
+udp = ["lazy_static", "mio/udp", "mio/os-poll"]
+uds = ["lazy_static", "libc", "mio/uds", "mio/os-poll"]
[dependencies]
tokio-macros = { version = "0.3.0", path = "../tokio-macros", optional = true }
@@ -98,20 +98,16 @@ fnv = { version = "1.0.6", optional = true }
futures-core = { version = "0.3.0", optional = true }
lazy_static = { version = "1.0.2", optional = true }
memchr = { version = "2.2", optional = true }
-mio = { version = "0.6.20", optional = true }
+mio = { version = "0.7.2", optional = true }
num_cpus = { version = "1.8.0", optional = true }
parking_lot = { version = "0.11.0", optional = true } # Not in full
slab = { version = "0.4.1", optional = true } # Backs `DelayQueue`
tracing = { version = "0.1.16", default-features = false, features = ["std"], optional = true } # Not in full
[target.'cfg(unix)'.dependencies]
-mio-uds = { version = "0.6.5", optional = true }
libc = { version = "0.2.42", optional = true }
signal-hook-registry = { version = "1.1.1", optional = true }
-[target.'cfg(windows)'.dependencies]
-mio-named-pipes = { version = "0.1.6", optional = true }
-
[target.'cfg(windows)'.dependencies.winapi]
version = "0.3.8"
default-features = false
diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs
index 30b30203..c4f5887a 100644
--- a/tokio/src/io/driver/mod.rs
+++ b/tokio/src/io/driver/mod.rs
@@ -1,4 +1,5 @@
-pub(crate) mod platform;
+mod ready;
+use ready::Ready;
mod scheduled_io;
pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests
@@ -8,7 +9,6 @@ use crate::runtime::context;
use crate::util::bit;
use crate::util::slab::{self, Slab};
-use mio::event::Evented;
use std::fmt;
use std::io;
use std::sync::{Arc, Weak};
@@ -27,10 +27,11 @@ pub(crate) struct Driver {
/// with this driver.
resources: Slab<ScheduledIo>,
+ /// The system event queue
+ poll: mio::Poll,
+
/// State shared between the reactor and the handles.
inner: Arc<Inner>,
-
- _wakeup_registration: mio::Registration,
}
/// A reference to an I/O driver
@@ -41,18 +42,18 @@ pub(crate) struct Handle {
pub(crate) struct ReadyEvent {
tick: u8,
- readiness: mio::Ready,
+ ready: Ready,
}
pub(super) struct Inner {
- /// The underlying system event queue.
- io: mio::Poll,
+ /// Registers I/O resources
+ registry: mio::Registry,
/// Allocates `ScheduledIo` handles when creating new resources.
pub(super) io_dispatch: slab::Allocator<ScheduledIo>,
/// Used to wake up the reactor from a call to `turn`
- wakeup: mio::SetReadiness,
+ waker: mio::Waker,
}
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
@@ -92,27 +93,22 @@ impl Driver {
/// Creates a new event loop, returning any error that happened during the
/// creation.
pub(crate) fn new() -> io::Result<Driver> {
- let io = mio::Poll::new()?;
- let wakeup_pair = mio::Registration::new2();
+ let poll = mio::Poll::new()?;
+ let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
+ let registry = poll.registry().try_clone()?;
+
let slab = Slab::new();
let allocator = slab.allocator();
- io.register(
- &wakeup_pair.0,
- TOKEN_WAKEUP,
- mio::Ready::readable(),
- mio::PollOpt::level(),
- )?;
-
Ok(Driver {
tick: 0,
events: Some(mio::Events::with_capacity(1024)),
resources: slab,
- _wakeup_registration: wakeup_pair.0,
+ poll,
inner: Arc::new(Inner {
- io,
+ registry,
io_dispatch: allocator,
- wakeup: wakeup_pair.1,
+ waker,
}),
})
}
@@ -143,23 +139,18 @@ impl Driver {
// Block waiting for an event to happen, peeling out how many events
// happened.
- match self.inner.io.poll(&mut events, max_wait) {
+ match self.poll.poll(&mut events, max_wait) {
Ok(_) => {}
+ Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
// Process all the events that came in, dispatching appropriately
-
for event in events.iter() {
let token = event.token();
- if token == TOKEN_WAKEUP {
- self.inner
- .wakeup
- .set_readiness(mio::Ready::empty())
- .unwrap();
- } else {
- self.dispatch(token, event.readiness());
+ if token != TOKEN_WAKEUP {
+ self.dispatch(token, Ready::from_mio(event));
}
}
@@ -168,7 +159,7 @@ impl Driver {
Ok(())
}
- fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) {
+ fn dispatch(&mut self, token: mio::Token, ready: Ready) {
let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));
let io = match self.resources.get(addr) {
@@ -176,10 +167,9 @@ impl Driver {
None => return,
};
- let set = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| {
- curr | ready.as_usize()
- });
- if set.is_err() {
+ let res = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| curr | ready);
+
+ if res.is_err() {
// token no longer valid!
return;
}
@@ -194,7 +184,7 @@ impl Drop for Driver {
// If a task is waiting on the I/O resource, notify it. The task
// will then attempt to use the I/O resource and fail due to the
// driver being shutdown.
- io.wake(mio::Ready::all());
+ io.wake(Ready::ALL);
})
}
}
@@ -250,7 +240,7 @@ impl Handle {
/// return immediately.
fn wakeup(&self) {
if let Some(inner) = self.inner() {
- inner.wakeup.set_readiness(mio::Ready::readable()).unwrap();
+ inner.waker.wake().expect("failed to wake I/O driver");
}
}
@@ -279,8 +269,8 @@ impl Inner {
/// The registration token is returned.
pub(super) fn add_source(
&self,
- source: &dyn Evented,
- ready: mio::Ready,
+ source: &mut impl mio::event::Source,
+ interest: mio::Interest,
) -> io::Result<slab::Ref<ScheduledIo>> {
let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| {
io::Error::new(
@@ -291,26 +281,23 @@ impl Inner {
let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0));
- self.io
- .register(source, mio::Token(token), ready, mio::PollOpt::edge())?;
+ self.registry
+ .register(source, mio::Token(token), interest)?;
Ok(shared)
}
/// Deregisters an I/O resource from the reactor.
- pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> {
- self.io.deregister(source)
+ pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> {
+ self.registry.deregister(source)
}
}
impl Direction {
- pub(super) fn mask(self) -> mio::Ready {
+ pub(super) fn mask(self) -> Ready {
match self {
- Direction::Read => {
- // Everything except writable is signaled through read.
- mio::Ready::all() - mio::Ready::writable()
- }
- Direction::Write => mio::Ready::writable() | platform::hup() | platform::error(),
+ Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
+ Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
}
}
}
diff --git a/tokio/src/io/driver/ready.rs b/tokio/src/io/driver/ready.rs
new file mode 100644
index 00000000..8b556e94
--- /dev/null
+++ b/tokio/src/io/driver/ready.rs
@@ -0,0 +1,187 @@
+use std::fmt;
+use std::ops;
+
+const READABLE: usize = 0b0_01;
+const WRITABLE: usize = 0b0_10;
+const READ_CLOSED: usize = 0b0_0100;
+const WRITE_CLOSED: usize = 0b0_1000;
+
+/// A set of readiness event kinds.
+///
+/// `Ready` is set of operation descriptors indicating which kind of an
+/// operation is ready to be performed.
+///
+/// This struct only represents portable event kinds. Portable events are
+/// events that can be raised on any platform while guaranteeing no false
+/// positives.
+#[derive(Clone, Copy, PartialEq, PartialOrd)]
+pub(crate) struct Ready(usize);
+
+impl Ready {
+ /// Returns the empty `Ready` set.
+ pub(crate) const EMPTY: Ready = Ready(0);
+
+ /// Returns a `Ready` representing readable readiness.
+ pub(crate) const READABLE: Ready = Ready(READABLE);
+
+ /// Returns a `Ready` representing writable readiness.
+ pub(crate) const WRITABLE: Ready = Ready(WRITABLE);
+
+ /// Returns a `Ready` representing read closed readiness.
+ pub(crate) const READ_CLOSED: Ready = Ready(READ_CLOSED);
+
+ /// Returns a `Ready` representing write closed readiness.
+ pub(crate) const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED);
+
+ /// Returns a `Ready` representing readiness for all operations.
+ pub(crate) const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED);
+
+ pub(crate) fn from_mio(event: &mio::event::Event) -> Ready {
+ let mut ready = Ready::EMPTY;
+
+ if event.is_readable() {
+ ready |= Ready::READABLE;
+ }
+
+ if event.is_writable() {
+ ready |= Ready::WRITABLE;
+ }
+
+ if event.is_read_closed() {
+ ready |= Ready::READ_CLOSED;
+ }
+
+ if event.is_write_closed() {
+ ready |= Ready::WRITE_CLOSED;
+ }
+
+ ready
+ }
+
+ /// Returns true if `Ready` is the empty set
+ pub(crate) fn is_empty(self) -> bool {
+ self == Ready::EMPTY
+ }
+
+ /// Returns true if the value includes readable readiness
+ pub(crate) fn is_readable(self) -> bool {
+ self.contains(Ready::READABLE) || self.is_read_closed()
+ }
+
+ /// Returns true if the value includes writable readiness
+ pub(crate) fn is_writable(self) -> bool {
+ self.contains(Ready::WRITABLE) || self.is_write_closed()
+ }
+
+ /// Returns true if the value includes read closed readiness
+ pub(crate) fn is_read_closed(self) -> bool {
+ self.contains(Ready::READ_CLOSED)
+ }
+
+ /// Returns true if the value includes write closed readiness
+ pub(crate) fn is_write_closed(self) -> bool {
+ self.contains(Ready::WRITE_CLOSED)
+ }
+
+ /// Returns true if `self` is a superset of `other`.
+ ///
+ /// `other` may represent more than one readiness operations, in which case
+ /// the function only returns true if `self` contains all readiness
+ /// specified in `other`.
+ pub(crate) fn contains<T: Into<Self>>(self, other: T) -> bool {
+ let other = other.into();
+ (self & other) == other
+ }
+
+ /// Create a `Ready` instance using the given `usize` representation.
+ ///
+ /// The `usize` representation must have been obtained from a call to
+ /// `Readiness::as_usize`.
+ ///
+ /// This function is mainly provided to allow the caller to get a
+ /// readiness value from an `AtomicUsize`.
+ pub(crate) fn from_usize(val: usize) -> Ready {
+ Ready(val & Ready::ALL.as_usize())
+ }
+
+ /// Returns a `usize` representation of the `Ready` value.
+ ///
+ /// This function is mainly provided to allow the caller to store a
+ /// readiness value in an `AtomicUsize`.
+ pub(crate) fn as_usize(self) -> usize {
+ self.0
+ }
+}
+
+cfg_io_readiness! {
+ impl Ready {
+ pub(crate) fn from_interest(interest: mio::Interest) -> Ready {
+ let mut ready = Ready::EMPTY;
+
+ if interest.is_readable() {
+ ready |= Ready::READABLE;
+ ready |= Ready::READ_CLOSED;
+ }
+
+ if interest.is_writable() {
+ ready |= Ready::WRITABLE;
+ ready |= Ready::WRITE_CLOSED;
+ }
+
+ ready
+ }
+
+ pub(crate) fn intersection(self, interest: mio::Interest) -> Ready {
+ Ready(self.0 & Ready::from_interest(interest).0)
+ }
+
+ pub(crate) fn satisfies(self, interest: mio::Interest) -> bool {
+ self.0 & Ready::from_interest(interest).0 != 0
+ }
+ }
+}
+
+impl<T: Into<Ready>> ops::BitOr<T> for Ready {
+ type Output = Ready;
+
+ #[inline]
+ fn bitor(self, other: T) -> Ready {
+ Ready(self.0 | other.into().0)
+ }
+}
+
+impl<T: Into<Ready>> ops::BitOrAssign<T> for Ready {
+ #[inline]
+ fn bitor_assign(&mut self, other: T) {
+ self.0 |= other.into().0;
+ }
+}
+
+impl<T: Into<Ready>> ops::BitAnd<T> for Ready {
+ type Output = Ready;
+
+ #[inline]
+ fn bitand(self, other: T) -> Ready {
+ Ready(self.0 & other.into().0)
+ }
+}
+
+impl<T: Into<Ready>> ops::Sub<T> for Ready {
+ type Output = Ready;
+
+ #[inline]
+ fn sub(self, other: T) -> Ready {
+ Ready(self.0 & !other.into().0)
+ }
+}
+
+impl fmt::Debug for Ready {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Ready")
+ .field("is_readable", &self.is_readable())
+ .field("is_writable", &self.is_writable())
+ .field("is_read_closed", &self.is_read_closed())
+ .field("is_write_closed", &self.is_write_closed())
+ .finish()
+ }
+}
diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs
index f63fd7ab..bdf21798 100644
--- a/tokio/src/io/driver/scheduled_io.rs
+++ b/tokio/src/io/driver/scheduled_io.rs
@@ -1,4 +1,4 @@
-use super::{platform, Direction, ReadyEvent, Tick};
+use super::{Direction, Ready, ReadyEvent, Tick};
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::util::bit;
@@ -52,7 +52,7 @@ cfg_io_readiness! {
waker: Option<Waker>,
/// The interest this waiter is waiting on
- interest: mio::Ready,
+ interest: mio::Interest,
is_ready: bool,
@@ -141,8 +141,8 @@ impl ScheduledIo {
&self,
token: Option<usize>,
tick: Tick,
- f: impl Fn(usize) -> usize,
- ) -> Result<usize, ()> {
+ f: impl Fn(Ready) -> Ready,
+ ) -> Result<(), ()> {
let mut current = self.readiness.load(Acquire);
loop {
@@ -158,52 +158,46 @@ impl ScheduledIo {
// Mask out the tick/generation bits so that the modifying
// function doesn't see them.
- let current_readiness = current & mio::Ready::all().as_usize();
- let mut new = f(current_readiness);
+ let current_readiness = Ready::from_usize(current);
+ let new = f(current_readiness);
- debug_assert!(
- new <= READINESS.max_value(),
- "new readiness value would overwrite tick/generation bits!"
- );
-
- match tick {
- Tick::Set(t) => {
- new = TICK.pack(t as usize, new);
- }
+ let packed = match tick {
+ Tick::Set(t) => TICK.pack(t as usize, new.as_usize()),
Tick::Clear(t) => {
if TICK.unpack(current) as u8 != t {
// Trying to clear readiness with an old event!
return Err(());
}
- new = TICK.pack(t as usize, new);
+
+ TICK.pack(t as usize, new.as_usize())
}
- }
+ };
- new = GENERATION.pack(current_generation, new);
+ let next = GENERATION.pack(current_generation, packed);
match self
.readiness
- .compare_exchange(current, new, AcqRel, Acquire)
+ .compare_exchange(current, next, AcqRel, Acquire)
{
- Ok(_) => return Ok(current),
+ Ok(_) => return Ok(()),
// we lost the race, retry!
Err(actual) => current = actual,
}
}
}
- pub(super) fn wake(&self, ready: mio::Ready) {
+ pub(super) fn wake(&self, ready: Ready) {
let mut waiters = self.waiters.lock();
// check for AsyncRead slot
- if !(ready & (!mio::Ready::writable())).is_empty() {
+ if ready.is_readable() {
if let Some(waker) = waiters.reader.take() {
waker.wake();
}
}
// check for AsyncWrite slot
- if ready.is_writable() || platform::is_hup(ready) || platform::is_error(ready) {
+ if ready.is_writable() {
if let Some(waker) = waiters.writer.take() {
waker.wake();
}
@@ -212,10 +206,7 @@ impl ScheduledIo {
#[cfg(any(feature = "udp", feature = "uds"))]
{
// check list of waiters
- for waiter in waiters
- .list
- .drain_filter(|w| !(w.interest & ready).is_empty())
- {
+ for waiter in waiters.list.drain_filter(|w| ready.satisfies(w.interest)) {
let waiter = unsafe { &mut *waiter.as_ptr() };
if let Some(waker) = waiter.waker.take() {
waiter.is_ready = true;
@@ -237,7 +228,7 @@ impl ScheduledIo {
) -> Poll<ReadyEvent> {
let curr = self.readiness.load(Acquire);
- let ready = direction.mask() & mio::Ready::from_usize(READINESS.unpack(curr));
+ let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
if ready.is_empty() {
// Update the task info
@@ -251,50 +242,36 @@ impl ScheduledIo {
// Try again, in case the readiness was changed while we were
// taking the waiters lock
let curr = self.readiness.load(Acquire);
- let ready = direction.mask() & mio::Ready::from_usize(READINESS.unpack(curr));
+ let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
if ready.is_empty() {
Poll::Pending
} else {
Poll::Ready(ReadyEvent {
tick: TICK.unpack(curr) as u8,
- readiness: ready,
+ ready,
})
}
} else {
Poll::Ready(ReadyEvent {
tick: TICK.unpack(curr) as u8,
- readiness: ready,
+ ready,
})
}
}
pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
- // This consumes the current readiness state **except** for HUP and
- // error. HUP and error are excluded because a) they are final states
- // and never transition out and b) both the read AND the write
- // directions need to be able to obvserve these states.
- //
- // # Platform-specific behavior
- //
- // HUP and error readiness are platform-specific. On epoll platforms,
- // HUP has specific conditions that must be met by both peers of a
- // connection in order to be triggered.
- //
- // On epoll platforms, `EPOLLERR` is signaled through
- // `UnixReady::error()` and is important to be observable by both read
- // AND write. A specific case that `EPOLLERR` occurs is when the read
- // end of a pipe is closed. When this occurs, a peer blocked by
- // writing to the pipe should be notified.
- let mask_no_hup = (event.readiness - platform::hup() - platform::error()).as_usize();
+ // This consumes the current readiness state **except** for closed
+ // states. Closed states are excluded because they are final states.
+ let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED;
// result isn't important
- let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr & (!mask_no_hup));
+ let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr - mask_no_closed);
}
}
impl Drop for ScheduledIo {
fn drop(&mut self) {
- self.wake(mio::Ready::all());
+ self.wake(Ready::ALL);
}
}
@@ -304,7 +281,7 @@ unsafe impl Sync for ScheduledIo {}
cfg_io_readiness! {
impl ScheduledIo {
/// An async version of `poll_readiness` which uses a linked list of wakers
- pub(crate) async fn readiness(&self, interest: mio::Ready) -> ReadyEvent {
+ pub(crate) async fn readiness(&self, interest: mio::Interest) -> ReadyEvent {
self.readiness_fut(interest).await
}
@@ -312,7 +289,7 @@ cfg_io_readiness! {
// we are borrowing the `UnsafeCell` possibly over await boundaries.
//
// Go figure.
- fn readiness_fut(&self, interest: mio::Ready) -> Readiness<'_> {
+ fn readiness_fut(&self, interest: mio::Interest) -> Readiness<'_> {
Readiness {
scheduled_io: self,
state: State::Init,
@@ -362,29 +339,31 @@ cfg_io_readiness! {
State::Init => {
// Optimistically check existing readiness
let curr = scheduled_io.readiness.load(SeqCst);
- let readiness = mio::Ready::from_usize(READINESS.unpack(curr));
+ let ready = Ready::from_usize(READINESS.unpack(curr));
// Safety: `waiter.interest` never changes
let interest = unsafe { (*waiter.get()).interest };
+ let ready = ready.intersection(interest);
- if readiness.contains(interest) {
+ if !ready.is_empty() {
// Currently ready!
let tick = TICK.unpack(curr) as u8;
*state = State::Done;
- return Poll::Ready(ReadyEvent { readiness: interest, tick });
+ return Poll::Ready(ReadyEvent { ready, tick });
}
// Wasn't ready, take the lock (and check again while locked).
let mut waiters = scheduled_io.waiters.lock();
let curr = scheduled_io.readiness.load(SeqCst);
- let readiness = mio::Ready::from_usize(READINESS.unpack(curr));
+ let ready = Ready::from_usize(READINESS.unpack(curr));
+ let ready = ready.intersection(interest);
- if readiness.contains(interest) {
+ if !ready.is_empty() {
// Currently ready!
let tick = TICK.unpack(curr) as u8;
*state = State::Done;
- return Poll::Ready(ReadyEvent { readiness, tick });
+ return Poll::Ready(ReadyEvent { ready, tick });
}
// Not ready even after locked, insert into list...
@@ -440,7 +419,7 @@ cfg_io_readiness! {
return Poll::Ready(ReadyEvent {
tick,
- readiness: w.interest,
+ ready: Ready::from_interest(w.interest),
});
}
}
diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs
index 2c943ea4..4457195f 100644
--- a/tokio/src/io/poll_evented.rs
+++ b/tokio/src/io/poll_evented.rs
@@ -2,7 +2,7 @@ use crate::io::driver::{Direction, Handle, ReadyEvent};
use crate::io::registration::Registration;
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
-use mio::event::Evented;
+use mio::event::Source;
use std::fmt;
use std::io::{self, Read, Write};
use std::marker::Unpin;
@@ -69,7 +69,7 @@ cfg_io_driver! {
/// [`clear_write_ready`]: method@Self::clear_write_ready
/// [`poll_read_ready`]: method@Self::poll_read_ready
/// [`poll_write_ready`]: method@Self::poll_write_ready
- pub(crate) struct PollEvented<E: Evented> {
+ pub(crate) struct PollEvented<E: Source> {
io: Option<E>,
registration: Registration,
}
@@ -77,10 +77,7 @@ cfg_io_driver! {
// ===== impl PollEvented =====
-impl<E> PollEvented<E>
-where
- E: Evented,
-{
+impl<E: Source> PollEvented<E> {
/// Creates a new `PollEvented` associated with the default reactor.
///
/// # Panics
@@ -92,26 +89,15 @@ where
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
#[cfg_attr(feature = "signal", allow(unused))]
pub(crate) fn new(io: E) -> io::Result<Self> {
- PollEvented::new_with_ready(io, mio::Ready::all())
+ PollEvented::new_with_interest(io, mio::Interest::READABLE | mio::Interest::WRITABLE)
}
- /// Creates a new `PollEvented` associated with the default reactor, for specific `mio::Ready`
- /// state. `new_with_ready` should be used over `new` when you need control over the readiness
+ /// Creates a new `PollEvented` associated with the default reactor, for specific `mio::Interest`
+ /// state. `new_with_interest` should be used over `new` when you need control over the readiness
/// state, such as when a file descriptor only allows reads. This does not add `hup` or `error`
/// so if you are interested in those states, you will need to add them to the readiness state
/// passed to this function.
///
- /// An example to listen to read only
- ///
- /// ```rust
- /// ##[cfg(unix)]
- /// mio::Ready::from_usize(
- /// mio::Ready::readable().as_usize()
- /// | mio::unix::UnixReady::error().as_usize()
- /// | mio::unix::UnixReady::hup().as_usize()
- /// );
- /// ```
- ///
/// # Panics
///
/// This function panics if thread-local runtime is not set.
@@ -120,16 +106,16 @@ where
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
#[cfg_attr(feature = "signal", allow(unused))]
- pub(crate) fn new_with_ready(io: E, ready: mio::Ready) -> io::Result<Self> {
- Self::new_with_ready_and_handle(io, ready, Handle::current())
+ pub(crate) fn new_with_interest(io: E, interest: mio::Interest) -> io::Result<Self> {
+ Self::new_with_interest_and_handle(io, interest, Handle::current())
}
- pub(crate) fn new_with_ready_and_handle(
- io: E,
- ready: mio::Ready,
+ pub(crate) fn new_with_interest_and_handle(
+ mut io: E,
+ interest: mio::Interest,
handle: Handle,
) -> io::Result<Self> {
- let registration = Registration::new_with_ready_and_handle(&io, ready, handle)?;
+ let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
Ok(Self {
io: Some(io),
registration,
@@ -155,21 +141,6 @@ where
self.io.as_mut().unwrap()
}
- /// Consumes self, returning the inner I/O object
- ///
- /// This function will deregister the I/O resource from the reactor before
- /// returning. If the deregistration operation fails, an error is returned.
- ///
- /// Note that deregistering does not guarantee that the I/O resource can be
- /// registered with a different reactor. Some I/O resource types can only be
- /// associated with a single reactor instance for their lifetime.
- #[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))]
- pub(crate) fn into_inner(mut self) -> io::Result<E> {
- let io = self.io.take().unwrap();
- self.registration.deregister(&io)?;
- Ok(io)
- }
-
pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
self.registration.clear_readiness(event);
}
@@ -234,15 +205,12 @@ where
}
cfg_io_readiness! {
- impl<E> PollEvented<E>
- where
- E: Evented,
- {
- pub(crate) async fn readiness(&self, interest: mio::Ready) -> io::Result<ReadyEvent> {
+ impl<E: Source> PollEvented<E> {
+ pub(crate) async fn readiness(&self, interest: mio::Interest) -> io::Result<ReadyEvent> {
self.registration.readiness(interest).await
}
- pub(crate) async fn async_io<F, R>(&self, interest: mio::Ready, mut op: F) -> io::Result<R>
+ pub(crate) async fn async_io<F, R>(&self, interest: mio::Interest, mut op: F) -> io::Result<R>
where
F: FnMut(&E) -> io::Result<R>,
{
@@ -262,10 +230,7 @@ cfg_io_readiness! {
// ===== Read / Write impls =====
-impl<E> AsyncRead for PollEvented<E>
-where
- E: Evented + Read + Unpin,
-{
+impl<E: Source + Read + Unpin> AsyncRead for PollEvented<E> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
@@ -290,10 +255,7 @@ where
}
}
-impl<E> AsyncWrite for PollEvented<E>
-where
- E: Evented + Write + Unpin,
-{
+impl<E: Source + Write + Unpin> AsyncWrite for PollEvented<E> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
@@ -340,17 +302,17 @@ fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
}
}
-impl<E: Evented + fmt::Debug> fmt::Debug for PollEvented<E> {
+impl<E: Source + fmt::Debug> fmt::Debug for PollEvented<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PollEvented").field("io", &self.io).finish()
}
}
-impl<E: Evented> Drop for PollEvented<E> {
+impl<E: Source> Drop for PollEvented<E> {
fn drop(&mut self) {
- if let Some(io) = self.io.take() {
+ if let Some(mut io) = self.io.take() {
// Ignore errors
- let _ = self.registration.deregister(&io);
+ let _ = self.registration.deregister(&mut io);
}
}
}
diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs
index e4ec096f..03221b60 100644
--- a/tokio/src/io/registration.rs
+++ b/