diff options
Diffstat (limited to 'tokio/src/net/driver/reactor.rs')
-rw-r--r-- | tokio/src/net/driver/reactor.rs | 451 |
1 files changed, 451 insertions, 0 deletions
diff --git a/tokio/src/net/driver/reactor.rs b/tokio/src/net/driver/reactor.rs new file mode 100644 index 00000000..384abe47 --- /dev/null +++ b/tokio/src/net/driver/reactor.rs @@ -0,0 +1,451 @@ +use super::platform; + +use tokio_executor::park::{Park, Unpark}; +use tokio_sync::AtomicWaker; + +use mio::event::Evented; +use slab::Slab; +use std::cell::RefCell; +use std::io; +use std::marker::PhantomData; +#[cfg(all(unix, not(target_os = "fuchsia")))] +use std::os::unix::io::{AsRawFd, RawFd}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::{Relaxed, SeqCst}; +use std::sync::{Arc, RwLock, Weak}; +use std::task::Waker; +use std::time::Duration; +use std::{fmt, usize}; + +/// The core reactor, or event loop. +/// +/// The event loop is the main source of blocking in an application which drives +/// all other I/O events and notifications happening. Each event loop can have +/// multiple handles pointing to it, each of which can then be used to create +/// various I/O objects to interact with the event loop in interesting ways. +pub struct Reactor { + /// Reuse the `mio::Events` value across calls to poll. + events: mio::Events, + + /// State shared between the reactor and the handles. + inner: Arc<Inner>, + + _wakeup_registration: mio::Registration, +} + +/// A reference to a reactor. +/// +/// A `Handle` is used for associating I/O objects with an event loop +/// explicitly. Typically though you won't end up using a `Handle` that often +/// and will instead use the default reactor for the execution context. +#[derive(Clone)] +pub struct Handle { + inner: Weak<Inner>, +} + +/// Return value from the `turn` method on `Reactor`. +/// +/// Currently this value doesn't actually provide any functionality, but it may +/// in the future give insight into what happened during `turn`. +#[derive(Debug)] +pub struct Turn { + _priv: (), +} + +pub(super) struct Inner { + /// The underlying system event queue. + io: mio::Poll, + + /// ABA guard counter + next_aba_guard: AtomicUsize, + + /// Dispatch slabs for I/O and futures events + pub(super) io_dispatch: RwLock<Slab<ScheduledIo>>, + + /// Used to wake up the reactor from a call to `turn` + wakeup: mio::SetReadiness, +} + +pub(super) struct ScheduledIo { + aba_guard: usize, + pub(super) readiness: AtomicUsize, + pub(super) reader: AtomicWaker, + pub(super) writer: AtomicWaker, +} + +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +pub(super) enum Direction { + Read, + Write, +} + +thread_local! { + /// Tracks the reactor for the current execution context. + static CURRENT_REACTOR: RefCell<Option<Handle>> = RefCell::new(None) +} + +const TOKEN_SHIFT: usize = 22; + +// Kind of arbitrary, but this reserves some token space for later usage. +const MAX_SOURCES: usize = (1 << TOKEN_SHIFT) - 1; +const TOKEN_WAKEUP: mio::Token = mio::Token(MAX_SOURCES); + +fn _assert_kinds() { + fn _assert<T: Send + Sync>() {} + + _assert::<Handle>(); +} + +// ===== impl Reactor ===== + +#[derive(Debug)] +/// Guard that resets current reactor on drop. +pub struct DefaultGuard<'a> { + _lifetime: PhantomData<&'a u8>, +} + +impl Drop for DefaultGuard<'_> { + fn drop(&mut self) { + CURRENT_REACTOR.with(|current| { + let mut current = current.borrow_mut(); + *current = None; + }); + } +} + +/// Sets handle for a default reactor, returning guard that unsets it on drop. +pub fn set_default(handle: &Handle) -> DefaultGuard<'_> { + CURRENT_REACTOR.with(|current| { + let mut current = current.borrow_mut(); + + assert!( + current.is_none(), + "default Tokio reactor already set \ + for execution context" + ); + + *current = Some(handle.clone()); + }); + + DefaultGuard { + _lifetime: PhantomData, + } +} + +impl Reactor { + /// Creates a new event loop, returning any error that happened during the + /// creation. + pub fn new() -> io::Result<Reactor> { + let io = mio::Poll::new()?; + let wakeup_pair = mio::Registration::new2(); + + io.register( + &wakeup_pair.0, + TOKEN_WAKEUP, + mio::Ready::readable(), + mio::PollOpt::level(), + )?; + + Ok(Reactor { + events: mio::Events::with_capacity(1024), + _wakeup_registration: wakeup_pair.0, + inner: Arc::new(Inner { + io, + next_aba_guard: AtomicUsize::new(0), + io_dispatch: RwLock::new(Slab::with_capacity(1)), + wakeup: wakeup_pair.1, + }), + }) + } + + /// Returns a handle to this event loop which can be sent across threads + /// and can be used as a proxy to the event loop itself. + /// + /// Handles are cloneable and clones always refer to the same event loop. + /// This handle is typically passed into functions that create I/O objects + /// to bind them to this event loop. + pub fn handle(&self) -> Handle { + Handle { + inner: Arc::downgrade(&self.inner), + } + } + + /// Performs one iteration of the event loop, blocking on waiting for events + /// for at most `max_wait` (forever if `None`). + /// + /// This method is the primary method of running this reactor and processing + /// I/O events that occur. This method executes one iteration of an event + /// loop, blocking at most once waiting for events to happen. + /// + /// If a `max_wait` is specified then the method should block no longer than + /// the duration specified, but this shouldn't be used as a super-precise + /// timer but rather a "ballpark approximation" + /// + /// # Return value + /// + /// This function returns an instance of `Turn` + /// + /// `Turn` as of today has no extra information with it and can be safely + /// discarded. In the future `Turn` may contain information about what + /// happened while this reactor blocked. + /// + /// # Errors + /// + /// This function may also return any I/O error which occurs when polling + /// for readiness of I/O objects with the OS. This is quite unlikely to + /// arise and typically mean that things have gone horribly wrong at that + /// point. Currently this is primarily only known to happen for internal + /// bugs to `tokio` itself. + pub fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<Turn> { + self.poll(max_wait)?; + Ok(Turn { _priv: () }) + } + + /// Returns true if the reactor is currently idle. + /// + /// Idle is defined as all tasks that have been spawned have completed, + /// either successfully or with an error. + pub fn is_idle(&self) -> bool { + self.inner.io_dispatch.read().unwrap().is_empty() + } + + fn poll(&mut self, max_wait: Option<Duration>) -> io::Result<()> { + // Block waiting for an event to happen, peeling out how many events + // happened. + match self.inner.io.poll(&mut self.events, max_wait) { + Ok(_) => {} + Err(e) => return Err(e), + } + + // Process all the events that came in, dispatching appropriately + + for event in self.events.iter() { + let token = event.token(); + + if token == TOKEN_WAKEUP { + self.inner + .wakeup + .set_readiness(mio::Ready::empty()) + .unwrap(); + } else { + self.dispatch(token, event.readiness()); + } + } + + Ok(()) + } + + fn dispatch(&self, token: mio::Token, ready: mio::Ready) { + let aba_guard = token.0 & !MAX_SOURCES; + let token = token.0 & MAX_SOURCES; + + let mut rd = None; + let mut wr = None; + + // Create a scope to ensure that notifying the tasks stays out of the + // lock's critical section. + { + let io_dispatch = self.inner.io_dispatch.read().unwrap(); + + let io = match io_dispatch.get(token) { + Some(io) => io, + None => return, + }; + + if aba_guard != io.aba_guard { + return; + } + + io.readiness.fetch_or(ready.as_usize(), Relaxed); + + if ready.is_writable() || platform::is_hup(ready) { + wr = io.writer.take_waker(); + } + + if !(ready & (!mio::Ready::writable())).is_empty() { + rd = io.reader.take_waker(); + } + } + + if let Some(w) = rd { + w.wake(); + } + + if let Some(w) = wr { + w.wake(); + } + } +} + +#[cfg(all(unix, not(target_os = "fuchsia")))] +impl AsRawFd for Reactor { + fn as_raw_fd(&self) -> RawFd { + self.inner.io.as_raw_fd() + } +} + +impl Park for Reactor { + type Unpark = Handle; + type Error = io::Error; + + fn unpark(&self) -> Self::Unpark { + self.handle() + } + + fn park(&mut self) -> io::Result<()> { + self.turn(None)?; + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> io::Result<()> { + self.turn(Some(duration))?; + Ok(()) + } +} + +impl fmt::Debug for Reactor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Reactor") + } +} + +// ===== impl Handle ===== + +impl Handle { + /// Returns a handle to the current reactor + /// + /// # Panics + /// + /// This function panics if there is no current reactor set. + pub(super) fn current() -> Self { + CURRENT_REACTOR.with(|current| match *current.borrow() { + Some(ref handle) => handle.clone(), + None => panic!("no current reactor"), + }) + } + + /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise + /// makes the next call to `turn` return immediately. + /// + /// This method is intended to be used in situations where a notification + /// needs to otherwise be sent to the main reactor. If the reactor is + /// currently blocked inside of `turn` then it will wake up and soon return + /// after this method has been called. If the reactor is not currently + /// blocked in `turn`, then the next call to `turn` will not block and + /// return immediately. + fn wakeup(&self) { + if let Some(inner) = self.inner() { + inner.wakeup.set_readiness(mio::Ready::readable()).unwrap(); + } + } + + pub(super) fn inner(&self) -> Option<Arc<Inner>> { + self.inner.upgrade() + } +} + +impl Unpark for Handle { + fn unpark(&self) { + self.wakeup(); + } +} + +impl fmt::Debug for Handle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Handle") + } +} + +// ===== impl Inner ===== + +impl Inner { + /// Register an I/O resource with the reactor. + /// + /// The registration token is returned. + pub(super) fn add_source(&self, source: &dyn Evented) -> io::Result<usize> { + // Get an ABA guard value + let aba_guard = self.next_aba_guard.fetch_add(1 << TOKEN_SHIFT, Relaxed); + + let key = { + // Block to contain the write lock + let mut io_dispatch = self.io_dispatch.write().unwrap(); + + if io_dispatch.len() == MAX_SOURCES { + return Err(io::Error::new( + io::ErrorKind::Other, + "reactor at max \ + registered I/O resources", + )); + } + + io_dispatch.insert(ScheduledIo { + aba_guard, + readiness: AtomicUsize::new(0), + reader: AtomicWaker::new(), + writer: AtomicWaker::new(), + }) + }; + + let token = aba_guard | key; + + self.io.register( + source, + mio::Token(token), + mio::Ready::all(), + mio::PollOpt::edge(), + )?; + + Ok(key) + } + + /// Deregisters an I/O resource from the reactor. + pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> { + self.io.deregister(source) + } + + pub(super) fn drop_source(&self, token: usize) { + self.io_dispatch.write().unwrap().remove(token); + } + + /// Registers interest in the I/O resource associated with `token`. + pub(super) fn register(&self, token: usize, dir: Direction, w: Waker) { + let io_dispatch = self.io_dispatch.read().unwrap(); + let sched = io_dispatch.get(token).unwrap(); + + let (waker, ready) = match dir { + Direction::Read => (&sched.reader, !mio::Ready::writable()), + Direction::Write => (&sched.writer, mio::Ready::writable()), + }; + + waker.register(w); + + if sched.readiness.load(SeqCst) & ready.as_usize() != 0 { + waker.wake(); + } + } +} + +impl Drop for Inner { + fn drop(&mut self) { + // When a reactor is dropped it needs to wake up all blocked tasks as + // they'll never receive a notification, and all connected I/O objects + // will start returning errors pretty quickly. + let io = self.io_dispatch.read().unwrap(); + for (_, io) in io.iter() { + io.writer.wake(); + io.reader.wake(); + } + } +} + +impl Direction { + pub(super) fn mask(self) -> mio::Ready { + match self { + Direction::Read => { + // Everything except writable is signaled through read. + mio::Ready::all() - mio::Ready::writable() + } + Direction::Write => mio::Ready::writable() | platform::hup(), + } + } +} |