summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-20 00:05:14 -0800
committerGitHub <noreply@github.com>2019-11-20 00:05:14 -0800
commit69975fb9601bbb21659db283d888470733bae660 (patch)
tree8db7c9a31e4125646634af09e197ae6479e10cc4
parent7c8b8877d440629ab9a27a2c9dcef859835d3536 (diff)
Refactor the I/O driver, extracting slab to `tokio::util`. (#1792)
The I/O driver is made private and moved to `tokio::io::driver`. `Registration` is moved to `tokio::io::Registration` and `PollEvented` is moved to `tokio::io::PollEvented`. Additionally, the concurrent slab used by the I/O driver is cleaned up and extracted to `tokio::util::slab`, allowing it to eventually be used by other types.
-rw-r--r--tokio/src/io/driver/mod.rs (renamed from tokio/src/net/driver/reactor/mod.rs)179
-rw-r--r--tokio/src/io/driver/platform.rs (renamed from tokio/src/net/driver/platform.rs)0
-rw-r--r--tokio/src/io/driver/scheduled_io.rs142
-rw-r--r--tokio/src/io/mod.rs10
-rw-r--r--tokio/src/io/poll_evented.rs (renamed from tokio/src/net/util/poll_evented.rs)6
-rw-r--r--tokio/src/io/registration.rs (renamed from tokio/src/net/driver/registration.rs)30
-rw-r--r--tokio/src/lib.rs4
-rw-r--r--tokio/src/net/driver/mod.rs141
-rw-r--r--tokio/src/net/driver/reactor/dispatch/iter.rs53
-rw-r--r--tokio/src/net/driver/reactor/dispatch/mod.rs36
-rw-r--r--tokio/src/net/driver/reactor/dispatch/pack.rs89
-rw-r--r--tokio/src/net/driver/reactor/dispatch/page/scheduled_io.rs169
-rw-r--r--tokio/src/net/driver/reactor/dispatch/page/stack.rs151
-rw-r--r--tokio/src/net/driver/reactor/dispatch/sharded_slab.rs274
-rw-r--r--tokio/src/net/driver/reactor/dispatch/tests/loom/mod.rs204
-rw-r--r--tokio/src/net/driver/reactor/dispatch/tests/loom/single_shard.rs181
-rw-r--r--tokio/src/net/driver/reactor/dispatch/tests/loom/small_slab.rs473
-rw-r--r--tokio/src/net/driver/reactor/dispatch/tests/mod.rs30
-rw-r--r--tokio/src/net/driver/reactor/dispatch/tid.rs168
-rw-r--r--tokio/src/net/driver/reactor/reactor/sharded_slab/page/slot.rs102
-rw-r--r--tokio/src/net/mod.rs5
-rw-r--r--tokio/src/net/tcp/listener.rs2
-rw-r--r--tokio/src/net/tcp/stream.rs3
-rw-r--r--tokio/src/net/udp/socket.rs2
-rw-r--r--tokio/src/net/unix/datagram.rs2
-rw-r--r--tokio/src/net/unix/listener.rs2
-rw-r--r--tokio/src/net/unix/stream.rs3
-rw-r--r--tokio/src/net/util/mod.rs4
-rw-r--r--tokio/src/process/unix/mod.rs2
-rw-r--r--tokio/src/process/windows.rs2
-rw-r--r--tokio/src/runtime/io.rs6
-rw-r--r--tokio/src/signal/unix.rs3
-rw-r--r--tokio/src/sync/task/atomic_waker.rs2
-rw-r--r--tokio/src/util/bit.rs86
-rw-r--r--tokio/src/util/mod.rs15
-rw-r--r--tokio/src/util/slab/addr.rs155
-rw-r--r--tokio/src/util/slab/entry.rs7
-rw-r--r--tokio/src/util/slab/generation.rs32
-rw-r--r--tokio/src/util/slab/mod.rs109
-rw-r--r--tokio/src/util/slab/page.rs (renamed from tokio/src/net/driver/reactor/dispatch/page/mod.rs)150
-rw-r--r--tokio/src/util/slab/shard.rs108
-rw-r--r--tokio/src/util/slab/slot.rs42
-rw-r--r--tokio/src/util/slab/stack.rs58
-rw-r--r--tokio/src/util/slab/tests/loom_slab.rs327
-rw-r--r--tokio/src/util/slab/tests/loom_stack.rs88
-rw-r--r--tokio/src/util/slab/tests/mod.rs2
-rw-r--r--tokio/tests/io_driver.rs (renamed from tokio/tests/net_driver.rs)18
-rw-r--r--tokio/tests/io_driver_drop.rs44
-rw-r--r--tokio/tests/net_driver_drop.rs47
49 files changed, 1352 insertions, 2416 deletions
diff --git a/tokio/src/net/driver/reactor/mod.rs b/tokio/src/io/driver/mod.rs
index 8bcded41..4f47dc38 100644
--- a/tokio/src/net/driver/reactor/mod.rs
+++ b/tokio/src/io/driver/mod.rs
@@ -1,31 +1,24 @@
-use crate::loom::sync::atomic::AtomicUsize;
-use crate::net::driver::platform;
-use crate::runtime::{Park, Unpark};
+pub(crate) mod platform;
-use std::sync::atomic::Ordering::SeqCst;
+mod scheduled_io;
+pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests
-mod dispatch;
-use dispatch::SingleShard;
-pub(crate) use dispatch::MAX_SOURCES;
+use crate::loom::sync::atomic::AtomicUsize;
+use crate::runtime::{Park, Unpark};
+use crate::util::slab::{Address, Slab};
use mio::event::Evented;
use std::cell::RefCell;
+use std::fmt;
use std::io;
use std::marker::PhantomData;
-#[cfg(all(unix, not(target_os = "fuchsia")))]
-use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::{Arc, Weak};
+use std::sync::atomic::Ordering::SeqCst;
use std::task::Waker;
use std::time::Duration;
-use std::{fmt, usize};
-
-/// The core reactor, or event loop.
-///
-/// The event loop is the main source of blocking in an application which drives
-/// all other I/O events and notifications happening. Each event loop can have
-/// multiple handles pointing to it, each of which can then be used to create
-/// various I/O objects to interact with the event loop in interesting ways.
-pub struct Reactor {
+
+/// I/O driver, backed by Mio
+pub(crate) struct Driver {
/// Reuse the `mio::Events` value across calls to poll.
events: mio::Events,
@@ -35,33 +28,18 @@ pub struct Reactor {
_wakeup_registration: mio::Registration,
}
-/// A reference to a reactor.
-///
-/// A `Handle` is used for associating I/O objects with an event loop
-/// explicitly. Typically though you won't end up using a `Handle` that often
-/// and will instead use the default reactor for the execution context.
+/// A reference to an I/O driver
#[derive(Clone)]
-pub struct Handle {
+pub(crate) struct Handle {
inner: Weak<Inner>,
}
-/// Return value from the `turn` method on `Reactor`.
-///
-/// Currently this value doesn't actually provide any functionality, but it may
-/// in the future give insight into what happened during `turn`.
-#[derive(Debug)]
-pub struct Turn {
- _priv: (),
-}
-
pub(super) struct Inner {
/// The underlying system event queue.
io: mio::Poll,
/// Dispatch slabs for I/O and futures events
- // TODO(eliza): once worker threads are available, replace this with a
- // properly sharded slab.
- pub(super) io_dispatch: SingleShard,
+ pub(super) io_dispatch: Slab<ScheduledIo>,
/// The number of sources in `io_dispatch`.
n_sources: AtomicUsize,
@@ -81,7 +59,7 @@ thread_local! {
static CURRENT_REACTOR: RefCell<Option<Handle>> = RefCell::new(None)
}
-const TOKEN_WAKEUP: mio::Token = mio::Token(MAX_SOURCES);
+const TOKEN_WAKEUP: mio::Token = mio::Token(Address::NULL);
fn _assert_kinds() {
fn _assert<T: Send + Sync>() {}
@@ -89,11 +67,11 @@ fn _assert_kinds() {
_assert::<Handle>();
}
-// ===== impl Reactor =====
+// ===== impl Driver =====
#[derive(Debug)]
/// Guard that resets current reactor on drop.
-pub struct DefaultGuard<'a> {
+pub(crate) struct DefaultGuard<'a> {
_lifetime: PhantomData<&'a u8>,
}
@@ -107,7 +85,7 @@ impl Drop for DefaultGuard<'_> {
}
/// Sets handle for a default reactor, returning guard that unsets it on drop.
-pub fn set_default(handle: &Handle) -> DefaultGuard<'_> {
+pub(crate) fn set_default(handle: &Handle) -> DefaultGuard<'_> {
CURRENT_REACTOR.with(|current| {
let mut current = current.borrow_mut();
@@ -125,10 +103,10 @@ pub fn set_default(handle: &Handle) -> DefaultGuard<'_> {
}
}
-impl Reactor {
+impl Driver {
/// Creates a new event loop, returning any error that happened during the
/// creation.
- pub fn new() -> io::Result<Reactor> {
+ pub(crate) fn new() -> io::Result<Driver> {
let io = mio::Poll::new()?;
let wakeup_pair = mio::Registration::new2();
@@ -139,12 +117,12 @@ impl Reactor {
mio::PollOpt::level(),
)?;
- Ok(Reactor {
+ Ok(Driver {
events: mio::Events::with_capacity(1024),
_wakeup_registration: wakeup_pair.0,
inner: Arc::new(Inner {
io,
- io_dispatch: SingleShard::new(),
+ io_dispatch: Slab::new(),
n_sources: AtomicUsize::new(0),
wakeup: wakeup_pair.1,
}),
@@ -157,52 +135,13 @@ impl Reactor {
/// Handles are cloneable and clones always refer to the same event loop.
/// This handle is typically passed into functions that create I/O objects
/// to bind them to this event loop.
- pub fn handle(&self) -> Handle {
+ pub(crate) fn handle(&self) -> Handle {
Handle {
inner: Arc::downgrade(&self.inner),
}
}
- /// Performs one iteration of the event loop, blocking on waiting for events
- /// for at most `max_wait` (forever if `None`).
- ///
- /// This method is the primary method of running this reactor and processing
- /// I/O events that occur. This method executes one iteration of an event
- /// loop, blocking at most once waiting for events to happen.
- ///
- /// If a `max_wait` is specified then the method should block no longer than
- /// the duration specified, but this shouldn't be used as a super-precise
- /// timer but rather a "ballpark approximation"
- ///
- /// # Return value
- ///
- /// This function returns an instance of `Turn`
- ///
- /// `Turn` as of today has no extra information with it and can be safely
- /// discarded. In the future `Turn` may contain information about what
- /// happened while this reactor blocked.
- ///
- /// # Errors
- ///
- /// This function may also return any I/O error which occurs when polling
- /// for readiness of I/O objects with the OS. This is quite unlikely to
- /// arise and typically mean that things have gone horribly wrong at that
- /// point. Currently this is primarily only known to happen for internal
- /// bugs to `tokio` itself.
- pub fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<Turn> {
- self.poll(max_wait)?;
- Ok(Turn { _priv: () })
- }
-
- /// Returns true if the reactor is currently idle.
- ///
- /// Idle is defined as all tasks that have been spawned have completed,
- /// either successfully or with an error.
- pub fn is_idle(&self) -> bool {
- self.inner.n_sources.load(SeqCst) == 0
- }
-
- fn poll(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
+ fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
// Block waiting for an event to happen, peeling out how many events
// happened.
match self.inner.io.poll(&mut self.events, max_wait) {
@@ -232,13 +171,15 @@ impl Reactor {
let mut rd = None;
let mut wr = None;
- let io = match self.inner.io_dispatch.get(token.0) {
+ let address = Address::from_usize(token.0);
+
+ let io = match self.inner.io_dispatch.get(address) {
Some(io) => io,
None => return,
};
if io
- .set_readiness(token.0, |curr| curr | ready.as_usize())
+ .set_readiness(address, |curr| curr | ready.as_usize())
.is_err()
{
// token no longer valid!
@@ -263,14 +204,7 @@ impl Reactor {
}
}
-#[cfg(all(unix, not(target_os = "fuchsia")))]
-impl AsRawFd for Reactor {
- fn as_raw_fd(&self) -> RawFd {
- self.inner.io.as_raw_fd()
- }
-}
-
-impl Park for Reactor {
+impl Park for Driver {
type Unpark = Handle;
type Error = io::Error;
@@ -289,9 +223,9 @@ impl Park for Reactor {
}
}
-impl fmt::Debug for Reactor {
+impl fmt::Debug for Driver {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(f, "Reactor")
+ write!(f, "Driver")
}
}
@@ -348,22 +282,24 @@ impl Inner {
/// Register an I/O resource with the reactor.
///
/// The registration token is returned.
- pub(super) fn add_source(&self, source: &dyn Evented) -> io::Result<usize> {
- let token = self.io_dispatch.alloc().ok_or_else(|| {
+ pub(super) fn add_source(&self, source: &dyn Evented) -> io::Result<Address> {
+ let address = self.io_dispatch.alloc().ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"reactor at max registered I/O resources",
)
})?;
+
self.n_sources.fetch_add(1, SeqCst);
+
self.io.register(
source,
- mio::Token(token),
+ mio::Token(address.to_usize()),
mio::Ready::all(),
mio::PollOpt::edge(),
)?;
- Ok(token)
+ Ok(address)
}
/// Deregisters an I/O resource from the reactor.
@@ -371,20 +307,21 @@ impl Inner {
self.io.deregister(source)
}
- pub(super) fn drop_source(&self, token: usize) {
- self.io_dispatch.remove(token);
+ pub(super) fn drop_source(&self, address: Address) {
+ self.io_dispatch.remove(address);
self.n_sources.fetch_sub(1, SeqCst);
}
/// Registers interest in the I/O resource associated with `token`.
- pub(super) fn register(&self, token: usize, dir: Direction, w: Waker) {
+ pub(super) fn register(&self, token: Address, dir: Direction, w: Waker) {
let sched = self
.io_dispatch
.get(token)
- .unwrap_or_else(|| panic!("IO resource for token {} does not exist!", token));
+ .unwrap_or_else(|| panic!("IO resource for token {:?} does not exist!", token));
+
let readiness = sched
.get_readiness(token)
- .unwrap_or_else(|| panic!("token {} no longer valid!", token));
+ .unwrap_or_else(|| panic!("token {:?} no longer valid!", token));
let (waker, ready) = match dir {
Direction::Read => (&sched.reader, !mio::Ready::writable()),
@@ -392,24 +329,13 @@ impl Inner {
};
waker.register(w);
+
if readiness & ready.as_usize() != 0 {
waker.wake();
}
}
}
-impl Drop for Inner {
- fn drop(&mut self) {
- // When a reactor is dropped it needs to wake up all blocked tasks as
- // they'll never receive a notification, and all connected I/O objects
- // will start returning errors pretty quickly.
- for io in self.io_dispatch.unique_iter() {
- io.writer.wake();
- io.reader.wake();
- }
- }
-}
-
impl Direction {
pub(super) fn mask(self) -> mio::Ready {
match self {
@@ -459,20 +385,16 @@ mod tests {
#[test]
fn tokens_unique_when_dropped() {
loom::model(|| {
- println!("\n--- iteration ---\n");
- let reactor = Reactor::new().unwrap();
+ let reactor = Driver::new().unwrap();
let inner = reactor.inner;
let inner2 = inner.clone();
let token_1 = inner.add_source(&NotEvented).unwrap();
- println!("token 1: {:#x}", token_1);
let thread = thread::spawn(move || {
inner2.drop_source(token_1);
- println!("dropped: {:#x}", token_1);
});
let token_2 = inner.add_source(&NotEvented).unwrap();
- println!("token 2: {:#x}", token_2);
thread.join().unwrap();
assert!(token_1 != token_2);
@@ -482,8 +404,7 @@ mod tests {
#[test]
fn tokens_unique_when_dropped_on_full_page() {
loom::model(|| {
- println!("\n--- iteration ---\n");
- let reactor = Reactor::new().unwrap();
+ let reactor = Driver::new().unwrap();
let inner = reactor.inner;
let inner2 = inner.clone();
// add sources to fill up the first page so that the dropped index
@@ -493,14 +414,11 @@ mod tests {
}
let token_1 = inner.add_source(&NotEvented).unwrap();
- println!("token 1: {:#x}", token_1);
let thread = thread::spawn(move || {
inner2.drop_source(token_1);
- println!("dropped: {:#x}", token_1);
});
let token_2 = inner.add_source(&NotEvented).unwrap();
- println!("token 2: {:#x}", token_2);
thread.join().unwrap();
assert!(token_1 != token_2);
@@ -510,19 +428,16 @@ mod tests {
#[test]
fn tokens_unique_concurrent_add() {
loom::model(|| {
- println!("\n--- iteration ---\n");
- let reactor = Reactor::new().unwrap();
+ let reactor = Driver::new().unwrap();
let inner = reactor.inner;
let inner2 = inner.clone();
let thread = thread::spawn(move || {
let token_2 = inner2.add_source(&NotEvented).unwrap();
- println!("token 2: {:#x}", token_2);
token_2
});
let token_1 = inner.add_source(&NotEvented).unwrap();
- println!("token 1: {:#x}", token_1);
let token_2 = thread.join().unwrap();
assert!(token_1 != token_2);
diff --git a/tokio/src/net/driver/platform.rs b/tokio/src/io/driver/platform.rs
index 4cfe7345..4cfe7345 100644
--- a/tokio/src/net/driver/platform.rs
+++ b/tokio/src/io/driver/platform.rs
diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs
new file mode 100644
index 00000000..1eb6624c
--- /dev/null
+++ b/tokio/src/io/driver/scheduled_io.rs
@@ -0,0 +1,142 @@
+use crate::loom::future::AtomicWaker;
+use crate::loom::sync::atomic::AtomicUsize;
+use crate::util::bit;
+use crate::util::slab::{Address, Entry, Generation};
+
+use std::sync::atomic::Ordering::{Acquire, AcqRel, SeqCst};
+
+#[derive(Debug)]
+pub(crate) struct ScheduledIo {
+ readiness: AtomicUsize,
+ pub(crate) reader: AtomicWaker,
+ pub(crate) writer: AtomicWaker,
+}
+
+const PACK: bit::Pack = bit::Pack::most_significant(Generation::WIDTH);
+
+impl Entry for ScheduledIo {
+ fn generation(&self) -> Generation {
+ unpack_generation(self.readiness.load(SeqCst))
+ }
+
+ fn reset(&self, generation: Generation) -> bool {
+ let mut current = self.readiness.load(Acquire);
+
+ loop {
+ if unpack_generation(current) != generation {
+ return false;
+ }
+
+ let next = PACK.pack(generation.next().to_usize(), 0);
+
+ match self.readiness.compare_exchange(
+ current,
+ next,
+ AcqRel,
+ Acquire,
+ ) {
+ Ok(_) => break,
+ Err(actual) => current = actual,
+ }
+ }
+
+ drop(self.reader.take_waker());
+ drop(self.writer.take_waker());
+
+ true
+ }
+}
+
+impl Default for ScheduledIo {
+ fn default() -> ScheduledIo {
+ ScheduledIo {
+ readiness: AtomicUsize::new(0),
+ reader: AtomicWaker::new(),
+ writer: AtomicWaker::new(),
+ }
+ }
+}
+
+impl ScheduledIo {
+ /// Returns the current readiness value of this `ScheduledIo`, if the
+ /// provided `token` is still a valid access.
+ ///
+ /// # Returns
+ ///
+ /// If the given token's generation no longer matches the `ScheduledIo`'s
+ /// generation, then the corresponding IO resource has been removed and
+ /// replaced with a new resource. In that case, this method returns `None`.
+ /// Otherwise, this returns the current readiness.
+ pub(crate) fn get_readiness(&self, address: Address) -> Option<usize> {
+ let ready = self.readiness.load(Acquire);
+
+ if unpack_generation(ready) != address.generation() {
+ return None;
+ }
+
+ Some(ready & !PACK.mask())
+ }
+
+ /// Sets the readiness on this `ScheduledIo` by invoking the given closure on
+ /// the current value, returning the previous readiness value.
+ ///
+ /// # Arguments
+ /// - `token`: the token for this `ScheduledIo`.
+ /// - `f`: a closure returning a new readiness value given the previous
+ /// readiness.
+ ///
+ /// # Returns
+ ///
+ /// If the given token's generation no longer matches the `ScheduledIo`'s
+ /// generation, then the corresponding IO resource has been removed and
+ /// replaced with a new resource. In that case, this method returns `Err`.
+ /// Otherwise, this returns the previous readiness.
+ pub(crate) fn set_readiness(
+ &self,
+ address: Address,
+ f: impl Fn(usize) -> usize,
+ ) -> Result<usize, ()> {
+ let generation = address.generation();
+
+ let mut current = self.readiness.load(Acquire);
+
+ loop {
+ // Check that the generation for this access is still the current
+ // one.
+ if unpack_generation(current) != generation {
+ return Err(());
+ }
+ // Mask out the generation bits so that the modifying function
+ // doesn't see them.
+ let current_readiness = current & mio::Ready::all().as_usize();
+ let new = f(current_readiness);
+
+ debug_assert!(
+ new <= !PACK.max_value(),
+ "new readiness value would overwrite generation bits!"
+ );
+
+ match self.readiness.compare_exchange(
+ current,
+ PACK.pack(generation.to_usize(), new),
+ AcqRel,
+ Acquire,
+ ) {
+ Ok(_) => return Ok(current),
+ // we lost the race, retry!
+ Err(actual) => current = actual,
+ }
+ }
+ }
+}
+
+impl Drop for ScheduledIo {
+ fn drop(&mut self) {
+ self.writer.wake();
+ self.reader.wake();
+ }
+}
+
+fn unpack_generation(src: usize) -> Generation {
+ Generation::new(PACK.unpack(src))
+}
diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs
index df1888ce..6fa8a17f 100644
--- a/tokio/src/io/mod.rs
+++ b/tokio/src/io/mod.rs
@@ -49,6 +49,16 @@ pub use self::async_read::AsyncRead;
mod async_write;
pub use self::async_write::AsyncWrite;
+cfg_io_driver! {
+ pub(crate) mod driver;
+
+ mod poll_evented;
+ pub use poll_evented::PollEvented;
+
+ mod registration;
+ pub use registration::Registration;
+}
+
cfg_io_std! {
mod stderr;
pub use stderr::{stderr, Stderr};
diff --git a/tokio/src/net/util/poll_evented.rs b/tokio/src/io/poll_evented.rs
index 08dea3f3..d1644ca2 100644
--- a/tokio/src/net/util/poll_evented.rs
+++ b/tokio/src/io/poll_evented.rs
@@ -1,5 +1,5 @@
-use crate::io::{AsyncRead, AsyncWrite};
-use crate::net::driver::{platform, Registration};
+use crate::io::{AsyncRead, AsyncWrite, Registration};
+use crate::io::driver::{platform};
use mio::event::Evented;
use std::fmt;
@@ -52,7 +52,7 @@ use std::task::{Context, Poll};
/// [`clear_read_ready`].
///
/// ```rust
-/// use tokio::net::util::PollEvented;
+/// use tokio::io::PollEvented;
///
/// use futures::ready;
/// use mio::Ready;
diff --git a/tokio/src/net/driver/registration.rs b/tokio/src/io/registration.rs
index 8e946d5a..1a8db6d7 100644
--- a/tokio/src/net/driver/registration.rs
+++ b/tokio/src/io/registration.rs
@@ -1,9 +1,9 @@
-use super::platform;
-use super::reactor::{Direction, Handle};
+use crate::io::driver::{Direction, Handle, platform};
+use crate::util::slab::Address;
use mio::{self, Evented};
use std::task::{Context, Poll};
-use std::{io, usize};
+use std::io;
/// Associates an I/O resource with the reactor instance that drives it.
///
@@ -38,7 +38,7 @@ use std::{io, usize};
#[derive(Debug)]
pub struct Registration {
handle: Handle,
- token: usize,
+ address: Address,
}
// ===== impl Registration =====
@@ -50,12 +50,12 @@ impl Registration {
///
/// - `Ok` if the registration happened successfully
/// - `Err` if an error was encountered during registration
- pub fn new<T>(io: &T) -> io::Result<Self>
+ pub fn new<T>(io: &T) -> io::Result<Registration>
where
T: Evented,
{
let handle = Handle::current();
- let token = if let Some(inner) = handle.inner() {
+ let address = if let Some(inner) = handle.inner() {
inner.add_source(io)?
} else {
return Err(io::Error::new(
@@ -63,7 +63,8 @@ impl Registration {
"failed to find event loop",
));
};
- Ok(Self { handle, token })
+
+ Ok(Registration { handle, address })
}
/// Deregister the I/O resource from the reactor it is associated with.
@@ -212,13 +213,13 @@ impl Registration {
// If the task should be notified about new events, ensure that it has
// been registered
if let Some(ref cx) = cx {
- inner.register(self.token, direction, cx.waker().clone())
+ inner.register(self.address, direction, cx.waker().clone())
}
let mask = direction.mask();
let mask_no_hup = (mask - platform::hup()).as_usize();
- let sched = inner.io_dispatch.get(self.token).unwrap();
+ let sched = inner.io_dispatch.get(self.address).unwrap();
// This consumes the current readiness state **except** for HUP. HUP is
// excluded because a) it is a final state and never transitions out of
@@ -229,8 +230,9 @@ impl Registration {
// `poll_ready` is called again with a _`direction` of `Write`, the HUP
// state would not be visible.
let curr_ready = sched
- .set_readiness(self.token, |curr| curr & (!mask_no_hup))
- .unwrap_or_else(|_| panic!("token {} no longer valid!", self.token));
+ .set_readiness(self.address, |curr| curr & (!mask_no_hup))
+ .unwrap_or_else(|_| panic!("address {:?} no longer valid!", self.address));
+
let mut ready = mask & mio::Ready::from_usize(curr_ready);
if ready.is_empty() {
@@ -243,8 +245,8 @@ impl Registration {
// Try again
let curr_ready = sched
- .set_readiness(self.