diff options
Diffstat (limited to 'tokio/src/io/driver/mod.rs')
-rw-r--r-- | tokio/src/io/driver/mod.rs | 83 |
1 files changed, 35 insertions, 48 deletions
diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index 30b30203..c4f5887a 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -1,4 +1,5 @@ -pub(crate) mod platform; +mod ready; +use ready::Ready; mod scheduled_io; pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests @@ -8,7 +9,6 @@ use crate::runtime::context; use crate::util::bit; use crate::util::slab::{self, Slab}; -use mio::event::Evented; use std::fmt; use std::io; use std::sync::{Arc, Weak}; @@ -27,10 +27,11 @@ pub(crate) struct Driver { /// with this driver. resources: Slab<ScheduledIo>, + /// The system event queue + poll: mio::Poll, + /// State shared between the reactor and the handles. inner: Arc<Inner>, - - _wakeup_registration: mio::Registration, } /// A reference to an I/O driver @@ -41,18 +42,18 @@ pub(crate) struct Handle { pub(crate) struct ReadyEvent { tick: u8, - readiness: mio::Ready, + ready: Ready, } pub(super) struct Inner { - /// The underlying system event queue. - io: mio::Poll, + /// Registers I/O resources + registry: mio::Registry, /// Allocates `ScheduledIo` handles when creating new resources. pub(super) io_dispatch: slab::Allocator<ScheduledIo>, /// Used to wake up the reactor from a call to `turn` - wakeup: mio::SetReadiness, + waker: mio::Waker, } #[derive(Debug, Eq, PartialEq, Clone, Copy)] @@ -92,27 +93,22 @@ impl Driver { /// Creates a new event loop, returning any error that happened during the /// creation. pub(crate) fn new() -> io::Result<Driver> { - let io = mio::Poll::new()?; - let wakeup_pair = mio::Registration::new2(); + let poll = mio::Poll::new()?; + let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?; + let registry = poll.registry().try_clone()?; + let slab = Slab::new(); let allocator = slab.allocator(); - io.register( - &wakeup_pair.0, - TOKEN_WAKEUP, - mio::Ready::readable(), - mio::PollOpt::level(), - )?; - Ok(Driver { tick: 0, events: Some(mio::Events::with_capacity(1024)), resources: slab, - _wakeup_registration: wakeup_pair.0, + poll, inner: Arc::new(Inner { - io, + registry, io_dispatch: allocator, - wakeup: wakeup_pair.1, + waker, }), }) } @@ -143,23 +139,18 @@ impl Driver { // Block waiting for an event to happen, peeling out how many events // happened. - match self.inner.io.poll(&mut events, max_wait) { + match self.poll.poll(&mut events, max_wait) { Ok(_) => {} + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} Err(e) => return Err(e), } // Process all the events that came in, dispatching appropriately - for event in events.iter() { let token = event.token(); - if token == TOKEN_WAKEUP { - self.inner - .wakeup - .set_readiness(mio::Ready::empty()) - .unwrap(); - } else { - self.dispatch(token, event.readiness()); + if token != TOKEN_WAKEUP { + self.dispatch(token, Ready::from_mio(event)); } } @@ -168,7 +159,7 @@ impl Driver { Ok(()) } - fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) { + fn dispatch(&mut self, token: mio::Token, ready: Ready) { let addr = slab::Address::from_usize(ADDRESS.unpack(token.0)); let io = match self.resources.get(addr) { @@ -176,10 +167,9 @@ impl Driver { None => return, }; - let set = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| { - curr | ready.as_usize() - }); - if set.is_err() { + let res = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| curr | ready); + + if res.is_err() { // token no longer valid! return; } @@ -194,7 +184,7 @@ impl Drop for Driver { // If a task is waiting on the I/O resource, notify it. The task // will then attempt to use the I/O resource and fail due to the // driver being shutdown. - io.wake(mio::Ready::all()); + io.wake(Ready::ALL); }) } } @@ -250,7 +240,7 @@ impl Handle { /// return immediately. fn wakeup(&self) { if let Some(inner) = self.inner() { - inner.wakeup.set_readiness(mio::Ready::readable()).unwrap(); + inner.waker.wake().expect("failed to wake I/O driver"); } } @@ -279,8 +269,8 @@ impl Inner { /// The registration token is returned. pub(super) fn add_source( &self, - source: &dyn Evented, - ready: mio::Ready, + source: &mut impl mio::event::Source, + interest: mio::Interest, ) -> io::Result<slab::Ref<ScheduledIo>> { let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| { io::Error::new( @@ -291,26 +281,23 @@ impl Inner { let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0)); - self.io - .register(source, mio::Token(token), ready, mio::PollOpt::edge())?; + self.registry + .register(source, mio::Token(token), interest)?; Ok(shared) } /// Deregisters an I/O resource from the reactor. - pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> { - self.io.deregister(source) + pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> { + self.registry.deregister(source) } } impl Direction { - pub(super) fn mask(self) -> mio::Ready { + pub(super) fn mask(self) -> Ready { match self { - Direction::Read => { - // Everything except writable is signaled through read. - mio::Ready::all() - mio::Ready::writable() - } - Direction::Write => mio::Ready::writable() | platform::hup() | platform::error(), + Direction::Read => Ready::READABLE | Ready::READ_CLOSED, + Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED, } } } |