diff options
author | Carl Lerche <me@carllerche.com> | 2019-11-20 00:05:14 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-11-20 00:05:14 -0800 |
commit | 69975fb9601bbb21659db283d888470733bae660 (patch) | |
tree | 8db7c9a31e4125646634af09e197ae6479e10cc4 | |
parent | 7c8b8877d440629ab9a27a2c9dcef859835d3536 (diff) |
Refactor the I/O driver, extracting slab to `tokio::util`. (#1792)
The I/O driver is made private and moved to `tokio::io::driver`. `Registration` is
moved to `tokio::io::Registration` and `PollEvented` is moved to `tokio::io::PollEvented`.
Additionally, the concurrent slab used by the I/O driver is cleaned up and extracted to
`tokio::util::slab`, allowing it to eventually be used by other types.
49 files changed, 1352 insertions, 2416 deletions
diff --git a/tokio/src/net/driver/reactor/mod.rs b/tokio/src/io/driver/mod.rs index 8bcded41..4f47dc38 100644 --- a/tokio/src/net/driver/reactor/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -1,31 +1,24 @@ -use crate::loom::sync::atomic::AtomicUsize; -use crate::net::driver::platform; -use crate::runtime::{Park, Unpark}; +pub(crate) mod platform; -use std::sync::atomic::Ordering::SeqCst; +mod scheduled_io; +pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests -mod dispatch; -use dispatch::SingleShard; -pub(crate) use dispatch::MAX_SOURCES; +use crate::loom::sync::atomic::AtomicUsize; +use crate::runtime::{Park, Unpark}; +use crate::util::slab::{Address, Slab}; use mio::event::Evented; use std::cell::RefCell; +use std::fmt; use std::io; use std::marker::PhantomData; -#[cfg(all(unix, not(target_os = "fuchsia")))] -use std::os::unix::io::{AsRawFd, RawFd}; use std::sync::{Arc, Weak}; +use std::sync::atomic::Ordering::SeqCst; use std::task::Waker; use std::time::Duration; -use std::{fmt, usize}; - -/// The core reactor, or event loop. -/// -/// The event loop is the main source of blocking in an application which drives -/// all other I/O events and notifications happening. Each event loop can have -/// multiple handles pointing to it, each of which can then be used to create -/// various I/O objects to interact with the event loop in interesting ways. -pub struct Reactor { + +/// I/O driver, backed by Mio +pub(crate) struct Driver { /// Reuse the `mio::Events` value across calls to poll. events: mio::Events, @@ -35,33 +28,18 @@ pub struct Reactor { _wakeup_registration: mio::Registration, } -/// A reference to a reactor. -/// -/// A `Handle` is used for associating I/O objects with an event loop -/// explicitly. Typically though you won't end up using a `Handle` that often -/// and will instead use the default reactor for the execution context. +/// A reference to an I/O driver #[derive(Clone)] -pub struct Handle { +pub(crate) struct Handle { inner: Weak<Inner>, } -/// Return value from the `turn` method on `Reactor`. -/// -/// Currently this value doesn't actually provide any functionality, but it may -/// in the future give insight into what happened during `turn`. -#[derive(Debug)] -pub struct Turn { - _priv: (), -} - pub(super) struct Inner { /// The underlying system event queue. io: mio::Poll, /// Dispatch slabs for I/O and futures events - // TODO(eliza): once worker threads are available, replace this with a - // properly sharded slab. - pub(super) io_dispatch: SingleShard, + pub(super) io_dispatch: Slab<ScheduledIo>, /// The number of sources in `io_dispatch`. n_sources: AtomicUsize, @@ -81,7 +59,7 @@ thread_local! { static CURRENT_REACTOR: RefCell<Option<Handle>> = RefCell::new(None) } -const TOKEN_WAKEUP: mio::Token = mio::Token(MAX_SOURCES); +const TOKEN_WAKEUP: mio::Token = mio::Token(Address::NULL); fn _assert_kinds() { fn _assert<T: Send + Sync>() {} @@ -89,11 +67,11 @@ fn _assert_kinds() { _assert::<Handle>(); } -// ===== impl Reactor ===== +// ===== impl Driver ===== #[derive(Debug)] /// Guard that resets current reactor on drop. -pub struct DefaultGuard<'a> { +pub(crate) struct DefaultGuard<'a> { _lifetime: PhantomData<&'a u8>, } @@ -107,7 +85,7 @@ impl Drop for DefaultGuard<'_> { } /// Sets handle for a default reactor, returning guard that unsets it on drop. -pub fn set_default(handle: &Handle) -> DefaultGuard<'_> { +pub(crate) fn set_default(handle: &Handle) -> DefaultGuard<'_> { CURRENT_REACTOR.with(|current| { let mut current = current.borrow_mut(); @@ -125,10 +103,10 @@ pub fn set_default(handle: &Handle) -> DefaultGuard<'_> { } } -impl Reactor { +impl Driver { /// Creates a new event loop, returning any error that happened during the /// creation. - pub fn new() -> io::Result<Reactor> { + pub(crate) fn new() -> io::Result<Driver> { let io = mio::Poll::new()?; let wakeup_pair = mio::Registration::new2(); @@ -139,12 +117,12 @@ impl Reactor { mio::PollOpt::level(), )?; - Ok(Reactor { + Ok(Driver { events: mio::Events::with_capacity(1024), _wakeup_registration: wakeup_pair.0, inner: Arc::new(Inner { io, - io_dispatch: SingleShard::new(), + io_dispatch: Slab::new(), n_sources: AtomicUsize::new(0), wakeup: wakeup_pair.1, }), @@ -157,52 +135,13 @@ impl Reactor { /// Handles are cloneable and clones always refer to the same event loop. /// This handle is typically passed into functions that create I/O objects /// to bind them to this event loop. - pub fn handle(&self) -> Handle { + pub(crate) fn handle(&self) -> Handle { Handle { inner: Arc::downgrade(&self.inner), } } - /// Performs one iteration of the event loop, blocking on waiting for events - /// for at most `max_wait` (forever if `None`). - /// - /// This method is the primary method of running this reactor and processing - /// I/O events that occur. This method executes one iteration of an event - /// loop, blocking at most once waiting for events to happen. - /// - /// If a `max_wait` is specified then the method should block no longer than - /// the duration specified, but this shouldn't be used as a super-precise - /// timer but rather a "ballpark approximation" - /// - /// # Return value - /// - /// This function returns an instance of `Turn` - /// - /// `Turn` as of today has no extra information with it and can be safely - /// discarded. In the future `Turn` may contain information about what - /// happened while this reactor blocked. - /// - /// # Errors - /// - /// This function may also return any I/O error which occurs when polling - /// for readiness of I/O objects with the OS. This is quite unlikely to - /// arise and typically mean that things have gone horribly wrong at that - /// point. Currently this is primarily only known to happen for internal - /// bugs to `tokio` itself. - pub fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<Turn> { - self.poll(max_wait)?; - Ok(Turn { _priv: () }) - } - - /// Returns true if the reactor is currently idle. - /// - /// Idle is defined as all tasks that have been spawned have completed, - /// either successfully or with an error. - pub fn is_idle(&self) -> bool { - self.inner.n_sources.load(SeqCst) == 0 - } - - fn poll(&mut self, max_wait: Option<Duration>) -> io::Result<()> { + fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> { // Block waiting for an event to happen, peeling out how many events // happened. match self.inner.io.poll(&mut self.events, max_wait) { @@ -232,13 +171,15 @@ impl Reactor { let mut rd = None; let mut wr = None; - let io = match self.inner.io_dispatch.get(token.0) { + let address = Address::from_usize(token.0); + + let io = match self.inner.io_dispatch.get(address) { Some(io) => io, None => return, }; if io - .set_readiness(token.0, |curr| curr | ready.as_usize()) + .set_readiness(address, |curr| curr | ready.as_usize()) .is_err() { // token no longer valid! @@ -263,14 +204,7 @@ impl Reactor { } } -#[cfg(all(unix, not(target_os = "fuchsia")))] -impl AsRawFd for Reactor { - fn as_raw_fd(&self) -> RawFd { - self.inner.io.as_raw_fd() - } -} - -impl Park for Reactor { +impl Park for Driver { type Unpark = Handle; type Error = io::Error; @@ -289,9 +223,9 @@ impl Park for Reactor { } } -impl fmt::Debug for Reactor { +impl fmt::Debug for Driver { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Reactor") + write!(f, "Driver") } } @@ -348,22 +282,24 @@ impl Inner { /// Register an I/O resource with the reactor. /// /// The registration token is returned. - pub(super) fn add_source(&self, source: &dyn Evented) -> io::Result<usize> { - let token = self.io_dispatch.alloc().ok_or_else(|| { + pub(super) fn add_source(&self, source: &dyn Evented) -> io::Result<Address> { + let address = self.io_dispatch.alloc().ok_or_else(|| { io::Error::new( io::ErrorKind::Other, "reactor at max registered I/O resources", ) })?; + self.n_sources.fetch_add(1, SeqCst); + self.io.register( source, - mio::Token(token), + mio::Token(address.to_usize()), mio::Ready::all(), mio::PollOpt::edge(), )?; - Ok(token) + Ok(address) } /// Deregisters an I/O resource from the reactor. @@ -371,20 +307,21 @@ impl Inner { self.io.deregister(source) } - pub(super) fn drop_source(&self, token: usize) { - self.io_dispatch.remove(token); + pub(super) fn drop_source(&self, address: Address) { + self.io_dispatch.remove(address); self.n_sources.fetch_sub(1, SeqCst); } /// Registers interest in the I/O resource associated with `token`. - pub(super) fn register(&self, token: usize, dir: Direction, w: Waker) { + pub(super) fn register(&self, token: Address, dir: Direction, w: Waker) { let sched = self .io_dispatch .get(token) - .unwrap_or_else(|| panic!("IO resource for token {} does not exist!", token)); + .unwrap_or_else(|| panic!("IO resource for token {:?} does not exist!", token)); + let readiness = sched .get_readiness(token) - .unwrap_or_else(|| panic!("token {} no longer valid!", token)); + .unwrap_or_else(|| panic!("token {:?} no longer valid!", token)); let (waker, ready) = match dir { Direction::Read => (&sched.reader, !mio::Ready::writable()), @@ -392,24 +329,13 @@ impl Inner { }; waker.register(w); + if readiness & ready.as_usize() != 0 { waker.wake(); } } } -impl Drop for Inner { - fn drop(&mut self) { - // When a reactor is dropped it needs to wake up all blocked tasks as - // they'll never receive a notification, and all connected I/O objects - // will start returning errors pretty quickly. - for io in self.io_dispatch.unique_iter() { - io.writer.wake(); - io.reader.wake(); - } - } -} - impl Direction { pub(super) fn mask(self) -> mio::Ready { match self { @@ -459,20 +385,16 @@ mod tests { #[test] fn tokens_unique_when_dropped() { loom::model(|| { - println!("\n--- iteration ---\n"); - let reactor = Reactor::new().unwrap(); + let reactor = Driver::new().unwrap(); let inner = reactor.inner; let inner2 = inner.clone(); let token_1 = inner.add_source(&NotEvented).unwrap(); - println!("token 1: {:#x}", token_1); let thread = thread::spawn(move || { inner2.drop_source(token_1); - println!("dropped: {:#x}", token_1); }); let token_2 = inner.add_source(&NotEvented).unwrap(); - println!("token 2: {:#x}", token_2); thread.join().unwrap(); assert!(token_1 != token_2); @@ -482,8 +404,7 @@ mod tests { #[test] fn tokens_unique_when_dropped_on_full_page() { loom::model(|| { - println!("\n--- iteration ---\n"); - let reactor = Reactor::new().unwrap(); + let reactor = Driver::new().unwrap(); let inner = reactor.inner; let inner2 = inner.clone(); // add sources to fill up the first page so that the dropped index @@ -493,14 +414,11 @@ mod tests { } let token_1 = inner.add_source(&NotEvented).unwrap(); - println!("token 1: {:#x}", token_1); let thread = thread::spawn(move || { inner2.drop_source(token_1); - println!("dropped: {:#x}", token_1); }); let token_2 = inner.add_source(&NotEvented).unwrap(); - println!("token 2: {:#x}", token_2); thread.join().unwrap(); assert!(token_1 != token_2); @@ -510,19 +428,16 @@ mod tests { #[test] fn tokens_unique_concurrent_add() { loom::model(|| { - println!("\n--- iteration ---\n"); - let reactor = Reactor::new().unwrap(); + let reactor = Driver::new().unwrap(); let inner = reactor.inner; let inner2 = inner.clone(); let thread = thread::spawn(move || { let token_2 = inner2.add_source(&NotEvented).unwrap(); - println!("token 2: {:#x}", token_2); token_2 }); let token_1 = inner.add_source(&NotEvented).unwrap(); - println!("token 1: {:#x}", token_1); let token_2 = thread.join().unwrap(); assert!(token_1 != token_2); diff --git a/tokio/src/net/driver/platform.rs b/tokio/src/io/driver/platform.rs index 4cfe7345..4cfe7345 100644 --- a/tokio/src/net/driver/platform.rs +++ b/tokio/src/io/driver/platform.rs diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs new file mode 100644 index 00000000..1eb6624c --- /dev/null +++ b/tokio/src/io/driver/scheduled_io.rs @@ -0,0 +1,142 @@ +use crate::loom::future::AtomicWaker; +use crate::loom::sync::atomic::AtomicUsize; +use crate::util::bit; +use crate::util::slab::{Address, Entry, Generation}; + +use std::sync::atomic::Ordering::{Acquire, AcqRel, SeqCst}; + +#[derive(Debug)] +pub(crate) struct ScheduledIo { + readiness: AtomicUsize, + pub(crate) reader: AtomicWaker, + pub(crate) writer: AtomicWaker, +} + +const PACK: bit::Pack = bit::Pack::most_significant(Generation::WIDTH); + +impl Entry for ScheduledIo { + fn generation(&self) -> Generation { + unpack_generation(self.readiness.load(SeqCst)) + } + + fn reset(&self, generation: Generation) -> bool { + let mut current = self.readiness.load(Acquire); + + loop { + if unpack_generation(current) != generation { + return false; + } + + let next = PACK.pack(generation.next().to_usize(), 0); + + match self.readiness.compare_exchange( + current, + next, + AcqRel, + Acquire, + ) { + Ok(_) => break, + Err(actual) => current = actual, + } + } + + drop(self.reader.take_waker()); + drop(self.writer.take_waker()); + + true + } +} + +impl Default for ScheduledIo { + fn default() -> ScheduledIo { + ScheduledIo { + readiness: AtomicUsize::new(0), + reader: AtomicWaker::new(), + writer: AtomicWaker::new(), + } + } +} + +impl ScheduledIo { + /// Returns the current readiness value of this `ScheduledIo`, if the + /// provided `token` is still a valid access. + /// + /// # Returns + /// + /// If the given token's generation no longer matches the `ScheduledIo`'s + /// generation, then the corresponding IO resource has been removed and + /// replaced with a new resource. In that case, this method returns `None`. + /// Otherwise, this returns the current readiness. + pub(crate) fn get_readiness(&self, address: Address) -> Option<usize> { + let ready = self.readiness.load(Acquire); + + if unpack_generation(ready) != address.generation() { + return None; + } + + Some(ready & !PACK.mask()) + } + + /// Sets the readiness on this `ScheduledIo` by invoking the given closure on + /// the current value, returning the previous readiness value. + /// + /// # Arguments + /// - `token`: the token for this `ScheduledIo`. + /// - `f`: a closure returning a new readiness value given the previous + /// readiness. + /// + /// # Returns + /// + /// If the given token's generation no longer matches the `ScheduledIo`'s + /// generation, then the corresponding IO resource has been removed and + /// replaced with a new resource. In that case, this method returns `Err`. + /// Otherwise, this returns the previous readiness. + pub(crate) fn set_readiness( + &self, + address: Address, + f: impl Fn(usize) -> usize, + ) -> Result<usize, ()> { + let generation = address.generation(); + + let mut current = self.readiness.load(Acquire); + + loop { + // Check that the generation for this access is still the current + // one. + if unpack_generation(current) != generation { + return Err(()); + } + // Mask out the generation bits so that the modifying function + // doesn't see them. + let current_readiness = current & mio::Ready::all().as_usize(); + let new = f(current_readiness); + + debug_assert!( + new <= !PACK.max_value(), + "new readiness value would overwrite generation bits!" + ); + + match self.readiness.compare_exchange( + current, + PACK.pack(generation.to_usize(), new), + AcqRel, + Acquire, + ) { + Ok(_) => return Ok(current), + // we lost the race, retry! + Err(actual) => current = actual, + } + } + } +} + +impl Drop for ScheduledIo { + fn drop(&mut self) { + self.writer.wake(); + self.reader.wake(); + } +} + +fn unpack_generation(src: usize) -> Generation { + Generation::new(PACK.unpack(src)) +} diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index df1888ce..6fa8a17f 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -49,6 +49,16 @@ pub use self::async_read::AsyncRead; mod async_write; pub use self::async_write::AsyncWrite; +cfg_io_driver! { + pub(crate) mod driver; + + mod poll_evented; + pub use poll_evented::PollEvented; + + mod registration; + pub use registration::Registration; +} + cfg_io_std! { mod stderr; pub use stderr::{stderr, Stderr}; diff --git a/tokio/src/net/util/poll_evented.rs b/tokio/src/io/poll_evented.rs index 08dea3f3..d1644ca2 100644 --- a/tokio/src/net/util/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -1,5 +1,5 @@ -use crate::io::{AsyncRead, AsyncWrite}; -use crate::net::driver::{platform, Registration}; +use crate::io::{AsyncRead, AsyncWrite, Registration}; +use crate::io::driver::{platform}; use mio::event::Evented; use std::fmt; @@ -52,7 +52,7 @@ use std::task::{Context, Poll}; /// [`clear_read_ready`]. /// /// ```rust -/// use tokio::net::util::PollEvented; +/// use tokio::io::PollEvented; /// /// use futures::ready; /// use mio::Ready; diff --git a/tokio/src/net/driver/registration.rs b/tokio/src/io/registration.rs index 8e946d5a..1a8db6d7 100644 --- a/tokio/src/net/driver/registration.rs +++ b/tokio/src/io/registration.rs @@ -1,9 +1,9 @@ -use super::platform; -use super::reactor::{Direction, Handle}; +use crate::io::driver::{Direction, Handle, platform}; +use crate::util::slab::Address; use mio::{self, Evented}; use std::task::{Context, Poll}; -use std::{io, usize}; +use std::io; /// Associates an I/O resource with the reactor instance that drives it. /// @@ -38,7 +38,7 @@ use std::{io, usize}; #[derive(Debug)] pub struct Registration { handle: Handle, - token: usize, + address: Address, } // ===== impl Registration ===== @@ -50,12 +50,12 @@ impl Registration { /// /// - `Ok` if the registration happened successfully /// - `Err` if an error was encountered during registration - pub fn new<T>(io: &T) -> io::Result<Self> + pub fn new<T>(io: &T) -> io::Result<Registration> where T: Evented, { let handle = Handle::current(); - let token = if let Some(inner) = handle.inner() { + let address = if let Some(inner) = handle.inner() { inner.add_source(io)? } else { return Err(io::Error::new( @@ -63,7 +63,8 @@ impl Registration { "failed to find event loop", )); }; - Ok(Self { handle, token }) + + Ok(Registration { handle, address }) } /// Deregister the I/O resource from the reactor it is associated with. @@ -212,13 +213,13 @@ impl Registration { // If the task should be notified about new events, ensure that it has // been registered if let Some(ref cx) = cx { - inner.register(self.token, direction, cx.waker().clone()) + inner.register(self.address, direction, cx.waker().clone()) } let mask = direction.mask(); let mask_no_hup = (mask - platform::hup()).as_usize(); - let sched = inner.io_dispatch.get(self.token).unwrap(); + let sched = inner.io_dispatch.get(self.address).unwrap(); // This consumes the current readiness state **except** for HUP. HUP is // excluded because a) it is a final state and never transitions out of @@ -229,8 +230,9 @@ impl Registration { // `poll_ready` is called again with a _`direction` of `Write`, the HUP // state would not be visible. let curr_ready = sched - .set_readiness(self.token, |curr| curr & (!mask_no_hup)) - .unwrap_or_else(|_| panic!("token {} no longer valid!", self.token)); + .set_readiness(self.address, |curr| curr & (!mask_no_hup)) + .unwrap_or_else(|_| panic!("address {:?} no longer valid!", self.address)); + let mut ready = mask & mio::Ready::from_usize(curr_ready); if ready.is_empty() { @@ -243,8 +245,8 @@ impl Registration { // Try again let curr_ready = sched - .set_readiness(self. |