diff options
author | Carl Lerche <me@carllerche.com> | 2020-10-02 13:54:00 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-02 13:54:00 -0700 |
commit | 1e585ccb516c8dc7c13cbc3d50f8ca49260b9617 (patch) | |
tree | 00959b4ac82e4972314baa043cdbca2f2ebf5848 | |
parent | 7ec6d88b21ea3e5531176f526a51dae0a4513025 (diff) |
io: update to Mio 0.7 (#2893)
This also makes Mio an implementation detail, removing it from the
public API.
This is based on #1767.
-rw-r--r-- | tokio/Cargo.toml | 22 | ||||
-rw-r--r-- | tokio/src/io/driver/mod.rs | 83 | ||||
-rw-r--r-- | tokio/src/io/driver/ready.rs | 187 | ||||
-rw-r--r-- | tokio/src/io/driver/scheduled_io.rs | 97 | ||||
-rw-r--r-- | tokio/src/io/poll_evented.rs | 80 | ||||
-rw-r--r-- | tokio/src/io/registration.rs | 37 | ||||
-rw-r--r-- | tokio/src/net/tcp/listener.rs | 32 | ||||
-rw-r--r-- | tokio/src/net/tcp/stream.rs | 239 | ||||
-rw-r--r-- | tokio/src/net/udp/socket.rs | 33 | ||||
-rw-r--r-- | tokio/src/net/unix/datagram/socket.rs | 50 | ||||
-rw-r--r-- | tokio/src/net/unix/listener.rs | 44 | ||||
-rw-r--r-- | tokio/src/net/unix/mod.rs | 3 | ||||
-rw-r--r-- | tokio/src/net/unix/socketaddr.rs | 31 | ||||
-rw-r--r-- | tokio/src/net/unix/stream.rs | 26 | ||||
-rw-r--r-- | tokio/src/process/unix/mod.rs | 33 | ||||
-rw-r--r-- | tokio/src/process/windows.rs | 2 | ||||
-rw-r--r-- | tokio/src/signal/unix.rs | 2 | ||||
-rw-r--r-- | tokio/src/signal/unix/driver.rs | 36 | ||||
-rw-r--r-- | tokio/tests/uds_datagram.rs | 27 |
19 files changed, 476 insertions, 588 deletions
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 0b201795..fdce440f 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -58,9 +58,9 @@ net = ["dns", "tcp", "udp", "uds"] process = [ "lazy_static", "libc", - "mio", - "mio-named-pipes", - "mio-uds", + "mio/os-poll", + "mio/os-util", + "mio/uds", "signal-hook-registry", "winapi/threadpoollegacyapiset", ] @@ -74,18 +74,18 @@ rt-threaded = [ signal = [ "lazy_static", "libc", - "mio", - "mio-uds", + "mio/os-poll", + "mio/uds", "signal-hook-registry", "winapi/consoleapi", ] stream = ["futures-core"] sync = ["fnv"] test-util = [] -tcp = ["lazy_static", "mio"] +tcp = ["lazy_static", "mio/tcp", "mio/os-poll"] time = ["slab"] -udp = ["lazy_static", "mio"] -uds = ["lazy_static", "libc", "mio", "mio-uds"] +udp = ["lazy_static", "mio/udp", "mio/os-poll"] +uds = ["lazy_static", "libc", "mio/uds", "mio/os-poll"] [dependencies] tokio-macros = { version = "0.3.0", path = "../tokio-macros", optional = true } @@ -98,20 +98,16 @@ fnv = { version = "1.0.6", optional = true } futures-core = { version = "0.3.0", optional = true } lazy_static = { version = "1.0.2", optional = true } memchr = { version = "2.2", optional = true } -mio = { version = "0.6.20", optional = true } +mio = { version = "0.7.2", optional = true } num_cpus = { version = "1.8.0", optional = true } parking_lot = { version = "0.11.0", optional = true } # Not in full slab = { version = "0.4.1", optional = true } # Backs `DelayQueue` tracing = { version = "0.1.16", default-features = false, features = ["std"], optional = true } # Not in full [target.'cfg(unix)'.dependencies] -mio-uds = { version = "0.6.5", optional = true } libc = { version = "0.2.42", optional = true } signal-hook-registry = { version = "1.1.1", optional = true } -[target.'cfg(windows)'.dependencies] -mio-named-pipes = { version = "0.1.6", optional = true } - [target.'cfg(windows)'.dependencies.winapi] version = "0.3.8" default-features = false diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index 30b30203..c4f5887a 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -1,4 +1,5 @@ -pub(crate) mod platform; +mod ready; +use ready::Ready; mod scheduled_io; pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests @@ -8,7 +9,6 @@ use crate::runtime::context; use crate::util::bit; use crate::util::slab::{self, Slab}; -use mio::event::Evented; use std::fmt; use std::io; use std::sync::{Arc, Weak}; @@ -27,10 +27,11 @@ pub(crate) struct Driver { /// with this driver. resources: Slab<ScheduledIo>, + /// The system event queue + poll: mio::Poll, + /// State shared between the reactor and the handles. inner: Arc<Inner>, - - _wakeup_registration: mio::Registration, } /// A reference to an I/O driver @@ -41,18 +42,18 @@ pub(crate) struct Handle { pub(crate) struct ReadyEvent { tick: u8, - readiness: mio::Ready, + ready: Ready, } pub(super) struct Inner { - /// The underlying system event queue. - io: mio::Poll, + /// Registers I/O resources + registry: mio::Registry, /// Allocates `ScheduledIo` handles when creating new resources. pub(super) io_dispatch: slab::Allocator<ScheduledIo>, /// Used to wake up the reactor from a call to `turn` - wakeup: mio::SetReadiness, + waker: mio::Waker, } #[derive(Debug, Eq, PartialEq, Clone, Copy)] @@ -92,27 +93,22 @@ impl Driver { /// Creates a new event loop, returning any error that happened during the /// creation. pub(crate) fn new() -> io::Result<Driver> { - let io = mio::Poll::new()?; - let wakeup_pair = mio::Registration::new2(); + let poll = mio::Poll::new()?; + let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?; + let registry = poll.registry().try_clone()?; + let slab = Slab::new(); let allocator = slab.allocator(); - io.register( - &wakeup_pair.0, - TOKEN_WAKEUP, - mio::Ready::readable(), - mio::PollOpt::level(), - )?; - Ok(Driver { tick: 0, events: Some(mio::Events::with_capacity(1024)), resources: slab, - _wakeup_registration: wakeup_pair.0, + poll, inner: Arc::new(Inner { - io, + registry, io_dispatch: allocator, - wakeup: wakeup_pair.1, + waker, }), }) } @@ -143,23 +139,18 @@ impl Driver { // Block waiting for an event to happen, peeling out how many events // happened. - match self.inner.io.poll(&mut events, max_wait) { + match self.poll.poll(&mut events, max_wait) { Ok(_) => {} + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} Err(e) => return Err(e), } // Process all the events that came in, dispatching appropriately - for event in events.iter() { let token = event.token(); - if token == TOKEN_WAKEUP { - self.inner - .wakeup - .set_readiness(mio::Ready::empty()) - .unwrap(); - } else { - self.dispatch(token, event.readiness()); + if token != TOKEN_WAKEUP { + self.dispatch(token, Ready::from_mio(event)); } } @@ -168,7 +159,7 @@ impl Driver { Ok(()) } - fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) { + fn dispatch(&mut self, token: mio::Token, ready: Ready) { let addr = slab::Address::from_usize(ADDRESS.unpack(token.0)); let io = match self.resources.get(addr) { @@ -176,10 +167,9 @@ impl Driver { None => return, }; - let set = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| { - curr | ready.as_usize() - }); - if set.is_err() { + let res = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| curr | ready); + + if res.is_err() { // token no longer valid! return; } @@ -194,7 +184,7 @@ impl Drop for Driver { // If a task is waiting on the I/O resource, notify it. The task // will then attempt to use the I/O resource and fail due to the // driver being shutdown. - io.wake(mio::Ready::all()); + io.wake(Ready::ALL); }) } } @@ -250,7 +240,7 @@ impl Handle { /// return immediately. fn wakeup(&self) { if let Some(inner) = self.inner() { - inner.wakeup.set_readiness(mio::Ready::readable()).unwrap(); + inner.waker.wake().expect("failed to wake I/O driver"); } } @@ -279,8 +269,8 @@ impl Inner { /// The registration token is returned. pub(super) fn add_source( &self, - source: &dyn Evented, - ready: mio::Ready, + source: &mut impl mio::event::Source, + interest: mio::Interest, ) -> io::Result<slab::Ref<ScheduledIo>> { let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| { io::Error::new( @@ -291,26 +281,23 @@ impl Inner { let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0)); - self.io - .register(source, mio::Token(token), ready, mio::PollOpt::edge())?; + self.registry + .register(source, mio::Token(token), interest)?; Ok(shared) } /// Deregisters an I/O resource from the reactor. - pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> { - self.io.deregister(source) + pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> { + self.registry.deregister(source) } } impl Direction { - pub(super) fn mask(self) -> mio::Ready { + pub(super) fn mask(self) -> Ready { match self { - Direction::Read => { - // Everything except writable is signaled through read. - mio::Ready::all() - mio::Ready::writable() - } - Direction::Write => mio::Ready::writable() | platform::hup() | platform::error(), + Direction::Read => Ready::READABLE | Ready::READ_CLOSED, + Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED, } } } diff --git a/tokio/src/io/driver/ready.rs b/tokio/src/io/driver/ready.rs new file mode 100644 index 00000000..8b556e94 --- /dev/null +++ b/tokio/src/io/driver/ready.rs @@ -0,0 +1,187 @@ +use std::fmt; +use std::ops; + +const READABLE: usize = 0b0_01; +const WRITABLE: usize = 0b0_10; +const READ_CLOSED: usize = 0b0_0100; +const WRITE_CLOSED: usize = 0b0_1000; + +/// A set of readiness event kinds. +/// +/// `Ready` is set of operation descriptors indicating which kind of an +/// operation is ready to be performed. +/// +/// This struct only represents portable event kinds. Portable events are +/// events that can be raised on any platform while guaranteeing no false +/// positives. +#[derive(Clone, Copy, PartialEq, PartialOrd)] +pub(crate) struct Ready(usize); + +impl Ready { + /// Returns the empty `Ready` set. + pub(crate) const EMPTY: Ready = Ready(0); + + /// Returns a `Ready` representing readable readiness. + pub(crate) const READABLE: Ready = Ready(READABLE); + + /// Returns a `Ready` representing writable readiness. + pub(crate) const WRITABLE: Ready = Ready(WRITABLE); + + /// Returns a `Ready` representing read closed readiness. + pub(crate) const READ_CLOSED: Ready = Ready(READ_CLOSED); + + /// Returns a `Ready` representing write closed readiness. + pub(crate) const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED); + + /// Returns a `Ready` representing readiness for all operations. + pub(crate) const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED); + + pub(crate) fn from_mio(event: &mio::event::Event) -> Ready { + let mut ready = Ready::EMPTY; + + if event.is_readable() { + ready |= Ready::READABLE; + } + + if event.is_writable() { + ready |= Ready::WRITABLE; + } + + if event.is_read_closed() { + ready |= Ready::READ_CLOSED; + } + + if event.is_write_closed() { + ready |= Ready::WRITE_CLOSED; + } + + ready + } + + /// Returns true if `Ready` is the empty set + pub(crate) fn is_empty(self) -> bool { + self == Ready::EMPTY + } + + /// Returns true if the value includes readable readiness + pub(crate) fn is_readable(self) -> bool { + self.contains(Ready::READABLE) || self.is_read_closed() + } + + /// Returns true if the value includes writable readiness + pub(crate) fn is_writable(self) -> bool { + self.contains(Ready::WRITABLE) || self.is_write_closed() + } + + /// Returns true if the value includes read closed readiness + pub(crate) fn is_read_closed(self) -> bool { + self.contains(Ready::READ_CLOSED) + } + + /// Returns true if the value includes write closed readiness + pub(crate) fn is_write_closed(self) -> bool { + self.contains(Ready::WRITE_CLOSED) + } + + /// Returns true if `self` is a superset of `other`. + /// + /// `other` may represent more than one readiness operations, in which case + /// the function only returns true if `self` contains all readiness + /// specified in `other`. + pub(crate) fn contains<T: Into<Self>>(self, other: T) -> bool { + let other = other.into(); + (self & other) == other + } + + /// Create a `Ready` instance using the given `usize` representation. + /// + /// The `usize` representation must have been obtained from a call to + /// `Readiness::as_usize`. + /// + /// This function is mainly provided to allow the caller to get a + /// readiness value from an `AtomicUsize`. + pub(crate) fn from_usize(val: usize) -> Ready { + Ready(val & Ready::ALL.as_usize()) + } + + /// Returns a `usize` representation of the `Ready` value. + /// + /// This function is mainly provided to allow the caller to store a + /// readiness value in an `AtomicUsize`. + pub(crate) fn as_usize(self) -> usize { + self.0 + } +} + +cfg_io_readiness! { + impl Ready { + pub(crate) fn from_interest(interest: mio::Interest) -> Ready { + let mut ready = Ready::EMPTY; + + if interest.is_readable() { + ready |= Ready::READABLE; + ready |= Ready::READ_CLOSED; + } + + if interest.is_writable() { + ready |= Ready::WRITABLE; + ready |= Ready::WRITE_CLOSED; + } + + ready + } + + pub(crate) fn intersection(self, interest: mio::Interest) -> Ready { + Ready(self.0 & Ready::from_interest(interest).0) + } + + pub(crate) fn satisfies(self, interest: mio::Interest) -> bool { + self.0 & Ready::from_interest(interest).0 != 0 + } + } +} + +impl<T: Into<Ready>> ops::BitOr<T> for Ready { + type Output = Ready; + + #[inline] + fn bitor(self, other: T) -> Ready { + Ready(self.0 | other.into().0) + } +} + +impl<T: Into<Ready>> ops::BitOrAssign<T> for Ready { + #[inline] + fn bitor_assign(&mut self, other: T) { + self.0 |= other.into().0; + } +} + +impl<T: Into<Ready>> ops::BitAnd<T> for Ready { + type Output = Ready; + + #[inline] + fn bitand(self, other: T) -> Ready { + Ready(self.0 & other.into().0) + } +} + +impl<T: Into<Ready>> ops::Sub<T> for Ready { + type Output = Ready; + + #[inline] + fn sub(self, other: T) -> Ready { + Ready(self.0 & !other.into().0) + } +} + +impl fmt::Debug for Ready { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Ready") + .field("is_readable", &self.is_readable()) + .field("is_writable", &self.is_writable()) + .field("is_read_closed", &self.is_read_closed()) + .field("is_write_closed", &self.is_write_closed()) + .finish() + } +} diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index f63fd7ab..bdf21798 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -1,4 +1,4 @@ -use super::{platform, Direction, ReadyEvent, Tick}; +use super::{Direction, Ready, ReadyEvent, Tick}; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Mutex; use crate::util::bit; @@ -52,7 +52,7 @@ cfg_io_readiness! { waker: Option<Waker>, /// The interest this waiter is waiting on - interest: mio::Ready, + interest: mio::Interest, is_ready: bool, @@ -141,8 +141,8 @@ impl ScheduledIo { &self, token: Option<usize>, tick: Tick, - f: impl Fn(usize) -> usize, - ) -> Result<usize, ()> { + f: impl Fn(Ready) -> Ready, + ) -> Result<(), ()> { let mut current = self.readiness.load(Acquire); loop { @@ -158,52 +158,46 @@ impl ScheduledIo { // Mask out the tick/generation bits so that the modifying // function doesn't see them. - let current_readiness = current & mio::Ready::all().as_usize(); - let mut new = f(current_readiness); + let current_readiness = Ready::from_usize(current); + let new = f(current_readiness); - debug_assert!( - new <= READINESS.max_value(), - "new readiness value would overwrite tick/generation bits!" - ); - - match tick { - Tick::Set(t) => { - new = TICK.pack(t as usize, new); - } + let packed = match tick { + Tick::Set(t) => TICK.pack(t as usize, new.as_usize()), Tick::Clear(t) => { if TICK.unpack(current) as u8 != t { // Trying to clear readiness with an old event! return Err(()); } - new = TICK.pack(t as usize, new); + + TICK.pack(t as usize, new.as_usize()) } - } + }; - new = GENERATION.pack(current_generation, new); + let next = GENERATION.pack(current_generation, packed); match self .readiness - .compare_exchange(current, new, AcqRel, Acquire) + .compare_exchange(current, next, AcqRel, Acquire) { - Ok(_) => return Ok(current), + Ok(_) => return Ok(()), // we lost the race, retry! Err(actual) => current = actual, } } } - pub(super) fn wake(&self, ready: mio::Ready) { + pub(super) fn wake(&self, ready: Ready) { let mut waiters = self.waiters.lock(); // check for AsyncRead slot - if !(ready & (!mio::Ready::writable())).is_empty() { + if ready.is_readable() { if let Some(waker) = waiters.reader.take() { waker.wake(); } } // check for AsyncWrite slot - if ready.is_writable() || platform::is_hup(ready) || platform::is_error(ready) { + if ready.is_writable() { if let Some(waker) = waiters.writer.take() { waker.wake(); } @@ -212,10 +206,7 @@ impl ScheduledIo { #[cfg(any(feature = "udp", feature = "uds"))] { // check list of waiters - for waiter in waiters - .list - .drain_filter(|w| !(w.interest & ready).is_empty()) - { + for waiter in waiters.list.drain_filter(|w| ready.satisfies(w.interest)) { let waiter = unsafe { &mut *waiter.as_ptr() }; if let Some(waker) = waiter.waker.take() { waiter.is_ready = true; @@ -237,7 +228,7 @@ impl ScheduledIo { ) -> Poll<ReadyEvent> { let curr = self.readiness.load(Acquire); - let ready = direction.mask() & mio::Ready::from_usize(READINESS.unpack(curr)); + let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); if ready.is_empty() { // Update the task info @@ -251,50 +242,36 @@ impl ScheduledIo { // Try again, in case the readiness was changed while we were // taking the waiters lock let curr = self.readiness.load(Acquire); - let ready = direction.mask() & mio::Ready::from_usize(READINESS.unpack(curr)); + let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); if ready.is_empty() { Poll::Pending } else { Poll::Ready(ReadyEvent { tick: TICK.unpack(curr) as u8, - readiness: ready, + ready, }) } } else { Poll::Ready(ReadyEvent { tick: TICK.unpack(curr) as u8, - readiness: ready, + ready, }) } } pub(crate) fn clear_readiness(&self, event: ReadyEvent) { - // This consumes the current readiness state **except** for HUP and - // error. HUP and error are excluded because a) they are final states - // and never transition out and b) both the read AND the write - // directions need to be able to obvserve these states. - // - // # Platform-specific behavior - // - // HUP and error readiness are platform-specific. On epoll platforms, - // HUP has specific conditions that must be met by both peers of a - // connection in order to be triggered. - // - // On epoll platforms, `EPOLLERR` is signaled through - // `UnixReady::error()` and is important to be observable by both read - // AND write. A specific case that `EPOLLERR` occurs is when the read - // end of a pipe is closed. When this occurs, a peer blocked by - // writing to the pipe should be notified. - let mask_no_hup = (event.readiness - platform::hup() - platform::error()).as_usize(); + // This consumes the current readiness state **except** for closed + // states. Closed states are excluded because they are final states. + let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED; // result isn't important - let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr & (!mask_no_hup)); + let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr - mask_no_closed); } } impl Drop for ScheduledIo { fn drop(&mut self) { - self.wake(mio::Ready::all()); + self.wake(Ready::ALL); } } @@ -304,7 +281,7 @@ unsafe impl Sync for ScheduledIo {} cfg_io_readiness! { impl ScheduledIo { /// An async version of `poll_readiness` which uses a linked list of wakers - pub(crate) async fn readiness(&self, interest: mio::Ready) -> ReadyEvent { + pub(crate) async fn readiness(&self, interest: mio::Interest) -> ReadyEvent { self.readiness_fut(interest).await } @@ -312,7 +289,7 @@ cfg_io_readiness! { // we are borrowing the `UnsafeCell` possibly over await boundaries. // // Go figure. - fn readiness_fut(&self, interest: mio::Ready) -> Readiness<'_> { + fn readiness_fut(&self, interest: mio::Interest) -> Readiness<'_> { Readiness { scheduled_io: self, state: State::Init, @@ -362,29 +339,31 @@ cfg_io_readiness! { State::Init => { // Optimistically check existing readiness let curr = scheduled_io.readiness.load(SeqCst); - let readiness = mio::Ready::from_usize(READINESS.unpack(curr)); + let ready = Ready::from_usize(READINESS.unpack(curr)); // Safety: `waiter.interest` never changes let interest = unsafe { (*waiter.get()).interest }; + let ready = ready.intersection(interest); - if readiness.contains(interest) { + if !ready.is_empty() { // Currently ready! let tick = TICK.unpack(curr) as u8; *state = State::Done; - return Poll::Ready(ReadyEvent { readiness: interest, tick }); + return Poll::Ready(ReadyEvent { ready, tick }); } // Wasn't ready, take the lock (and check again while locked). let mut waiters = scheduled_io.waiters.lock(); let curr = scheduled_io.readiness.load(SeqCst); - let readiness = mio::Ready::from_usize(READINESS.unpack(curr)); + let ready = Ready::from_usize(READINESS.unpack(curr)); + let ready = ready.intersection(interest); - if readiness.contains(interest) { + if !ready.is_empty() { // Currently ready! let tick = TICK.unpack(curr) as u8; *state = State::Done; - return Poll::Ready(ReadyEvent { readiness, tick }); + return Poll::Ready(ReadyEvent { ready, tick }); } // Not ready even after locked, insert into list... @@ -440,7 +419,7 @@ cfg_io_readiness! { return Poll::Ready(ReadyEvent { tick, - readiness: w.interest, + ready: Ready::from_interest(w.interest), }); } } diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index 2c943ea4..4457195f 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -2,7 +2,7 @@ use crate::io::driver::{Direction, Handle, ReadyEvent}; use crate::io::registration::Registration; use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; -use mio::event::Evented; +use mio::event::Source; use std::fmt; use std::io::{self, Read, Write}; use std::marker::Unpin; @@ -69,7 +69,7 @@ cfg_io_driver! { /// [`clear_write_ready`]: method@Self::clear_write_ready /// [`poll_read_ready`]: method@Self::poll_read_ready /// [`poll_write_ready`]: method@Self::poll_write_ready - pub(crate) struct PollEvented<E: Evented> { + pub(crate) struct PollEvented<E: Source> { io: Option<E>, registration: Registration, } @@ -77,10 +77,7 @@ cfg_io_driver! { // ===== impl PollEvented ===== -impl<E> PollEvented<E> -where - E: Evented, -{ +impl<E: Source> PollEvented<E> { /// Creates a new `PollEvented` associated with the default reactor. /// /// # Panics @@ -92,26 +89,15 @@ where /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. #[cfg_attr(feature = "signal", allow(unused))] pub(crate) fn new(io: E) -> io::Result<Self> { - PollEvented::new_with_ready(io, mio::Ready::all()) + PollEvented::new_with_interest(io, mio::Interest::READABLE | mio::Interest::WRITABLE) } - /// Creates a new `PollEvented` associated with the default reactor, for specific `mio::Ready` - /// state. `new_with_ready` should be used over `new` when you need control over the readiness + /// Creates a new `PollEvented` associated with the default reactor, for specific `mio::Interest` + /// state. `new_with_interest` should be used over `new` when you need control over the readiness /// state, such as when a file descriptor only allows reads. This does not add `hup` or `error` /// so if you are interested in those states, you will need to add them to the readiness state /// passed to this function. /// - /// An example to listen to read only - /// - /// ```rust - /// ##[cfg(unix)] - /// mio::Ready::from_usize( - /// mio::Ready::readable().as_usize() - /// | mio::unix::UnixReady::error().as_usize() - /// | mio::unix::UnixReady::hup().as_usize() - /// ); - /// ``` - /// /// # Panics /// /// This function panics if thread-local runtime is not set. @@ -120,16 +106,16 @@ where /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. #[cfg_attr(feature = "signal", allow(unused))] - pub(crate) fn new_with_ready(io: E, ready: mio::Ready) -> io::Result<Self> { - Self::new_with_ready_and_handle(io, ready, Handle::current()) + pub(crate) fn new_with_interest(io: E, interest: mio::Interest) -> io::Result<Self> { + Self::new_with_interest_and_handle(io, interest, Handle::current()) } - pub(crate) fn new_with_ready_and_handle( - io: E, - ready: mio::Ready, + pub(crate) fn new_with_interest_and_handle( + mut io: E, + interest: mio::Interest, handle: Handle, ) -> io::Result<Self> { - let registration = Registration::new_with_ready_and_handle(&io, ready, handle)?; + let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?; Ok(Self { io: Some(io), registration, @@ -155,21 +141,6 @@ where self.io.as_mut().unwrap() } - /// Consumes self, returning the inner I/O object - /// - /// This function will deregister the I/O resource from the reactor before - /// returning. If the deregistration operation fails, an error is returned. - /// - /// Note that deregistering does not guarantee that the I/O resource can be - /// registered with a different reactor. Some I/O resource types can only be - /// associated with a single reactor instance for their lifetime. - #[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))] - pub(crate) fn into_inner(mut self) -> io::Result<E> { - let io = self.io.take().unwrap(); - self.registration.deregister(&io)?; - Ok(io) - } - pub(crate) fn clear_readiness(&self, event: ReadyEvent) { self.registration.clear_readiness(event); } @@ -234,15 +205,12 @@ where } cfg_io_readiness! { - impl<E> PollEvented<E> - where - E: Evented, - { - pub(crate) async fn readiness(&self, interest: mio::Ready) -> io::Result<ReadyEvent> { + impl<E: Source> PollEvented<E> { + pub(crate) async fn readiness(&self, interest: mio::Interest) -> io::Result<ReadyEvent> { self.registration.readiness(interest).await } - pub(crate) async fn async_io<F, R>(&self, interest: mio::Ready, mut op: F) -> io::Result<R> + pub(crate) async fn async_io<F, R>(&self, interest: mio::Interest, mut op: F) -> io::Result<R> where F: FnMut(&E) -> io::Result<R>, { @@ -262,10 +230,7 @@ cfg_io_readiness! { // ===== Read / Write impls ===== -impl<E> AsyncRead for PollEvented<E> -where - E: Evented + Read + Unpin, -{ +impl<E: Source + Read + Unpin> AsyncRead for PollEvented<E> { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -290,10 +255,7 @@ where } } -impl<E> AsyncWrite for PollEvented<E> -where - E: Evented + Write + Unpin, -{ +impl<E: Source + Write + Unpin> AsyncWrite for PollEvented<E> { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -340,17 +302,17 @@ fn is_wouldblock<T>(r: &io::Result<T>) -> bool { } } -impl<E: Evented + fmt::Debug> fmt::Debug for PollEvented<E> { +impl<E: Source + fmt::Debug> fmt::Debug for PollEvented<E> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("PollEvented").field("io", &self.io).finish() } } -impl<E: Evented> Drop for PollEvented<E> { +impl<E: Source> Drop for PollEvented<E> { fn drop(&mut self) { - if let Some(io) = self.io.take() { + if let Some(mut io) = self.io.take() { // Ignore errors - let _ = self.registration.deregister(&io); + let _ = self.registration.deregister(&mut io); } } } diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs index e4ec096f..03221b60 100644 --- a/tokio/src/io/registration.rs +++ b/ |