summaryrefslogtreecommitdiffstats
path: root/tokio/src/io/driver/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/io/driver/mod.rs')
-rw-r--r--tokio/src/io/driver/mod.rs83
1 files changed, 35 insertions, 48 deletions
diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs
index 30b30203..c4f5887a 100644
--- a/tokio/src/io/driver/mod.rs
+++ b/tokio/src/io/driver/mod.rs
@@ -1,4 +1,5 @@
-pub(crate) mod platform;
+mod ready;
+use ready::Ready;
mod scheduled_io;
pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests
@@ -8,7 +9,6 @@ use crate::runtime::context;
use crate::util::bit;
use crate::util::slab::{self, Slab};
-use mio::event::Evented;
use std::fmt;
use std::io;
use std::sync::{Arc, Weak};
@@ -27,10 +27,11 @@ pub(crate) struct Driver {
/// with this driver.
resources: Slab<ScheduledIo>,
+ /// The system event queue
+ poll: mio::Poll,
+
/// State shared between the reactor and the handles.
inner: Arc<Inner>,
-
- _wakeup_registration: mio::Registration,
}
/// A reference to an I/O driver
@@ -41,18 +42,18 @@ pub(crate) struct Handle {
pub(crate) struct ReadyEvent {
tick: u8,
- readiness: mio::Ready,
+ ready: Ready,
}
pub(super) struct Inner {
- /// The underlying system event queue.
- io: mio::Poll,
+ /// Registers I/O resources
+ registry: mio::Registry,
/// Allocates `ScheduledIo` handles when creating new resources.
pub(super) io_dispatch: slab::Allocator<ScheduledIo>,
/// Used to wake up the reactor from a call to `turn`
- wakeup: mio::SetReadiness,
+ waker: mio::Waker,
}
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
@@ -92,27 +93,22 @@ impl Driver {
/// Creates a new event loop, returning any error that happened during the
/// creation.
pub(crate) fn new() -> io::Result<Driver> {
- let io = mio::Poll::new()?;
- let wakeup_pair = mio::Registration::new2();
+ let poll = mio::Poll::new()?;
+ let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
+ let registry = poll.registry().try_clone()?;
+
let slab = Slab::new();
let allocator = slab.allocator();
- io.register(
- &wakeup_pair.0,
- TOKEN_WAKEUP,
- mio::Ready::readable(),
- mio::PollOpt::level(),
- )?;
-
Ok(Driver {
tick: 0,
events: Some(mio::Events::with_capacity(1024)),
resources: slab,
- _wakeup_registration: wakeup_pair.0,
+ poll,
inner: Arc::new(Inner {
- io,
+ registry,
io_dispatch: allocator,
- wakeup: wakeup_pair.1,
+ waker,
}),
})
}
@@ -143,23 +139,18 @@ impl Driver {
// Block waiting for an event to happen, peeling out how many events
// happened.
- match self.inner.io.poll(&mut events, max_wait) {
+ match self.poll.poll(&mut events, max_wait) {
Ok(_) => {}
+ Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
// Process all the events that came in, dispatching appropriately
-
for event in events.iter() {
let token = event.token();
- if token == TOKEN_WAKEUP {
- self.inner
- .wakeup
- .set_readiness(mio::Ready::empty())
- .unwrap();
- } else {
- self.dispatch(token, event.readiness());
+ if token != TOKEN_WAKEUP {
+ self.dispatch(token, Ready::from_mio(event));
}
}
@@ -168,7 +159,7 @@ impl Driver {
Ok(())
}
- fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) {
+ fn dispatch(&mut self, token: mio::Token, ready: Ready) {
let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));
let io = match self.resources.get(addr) {
@@ -176,10 +167,9 @@ impl Driver {
None => return,
};
- let set = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| {
- curr | ready.as_usize()
- });
- if set.is_err() {
+ let res = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| curr | ready);
+
+ if res.is_err() {
// token no longer valid!
return;
}
@@ -194,7 +184,7 @@ impl Drop for Driver {
// If a task is waiting on the I/O resource, notify it. The task
// will then attempt to use the I/O resource and fail due to the
// driver being shutdown.
- io.wake(mio::Ready::all());
+ io.wake(Ready::ALL);
})
}
}
@@ -250,7 +240,7 @@ impl Handle {
/// return immediately.
fn wakeup(&self) {
if let Some(inner) = self.inner() {
- inner.wakeup.set_readiness(mio::Ready::readable()).unwrap();
+ inner.waker.wake().expect("failed to wake I/O driver");
}
}
@@ -279,8 +269,8 @@ impl Inner {
/// The registration token is returned.
pub(super) fn add_source(
&self,
- source: &dyn Evented,
- ready: mio::Ready,
+ source: &mut impl mio::event::Source,
+ interest: mio::Interest,
) -> io::Result<slab::Ref<ScheduledIo>> {
let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| {
io::Error::new(
@@ -291,26 +281,23 @@ impl Inner {
let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0));
- self.io
- .register(source, mio::Token(token), ready, mio::PollOpt::edge())?;
+ self.registry
+ .register(source, mio::Token(token), interest)?;
Ok(shared)
}
/// Deregisters an I/O resource from the reactor.
- pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> {
- self.io.deregister(source)
+ pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> {
+ self.registry.deregister(source)
}
}
impl Direction {
- pub(super) fn mask(self) -> mio::Ready {
+ pub(super) fn mask(self) -> Ready {
match self {
- Direction::Read => {
- // Everything except writable is signaled through read.
- mio::Ready::all() - mio::Ready::writable()
- }
- Direction::Write => mio::Ready::writable() | platform::hup() | platform::error(),
+ Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
+ Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
}
}
}