diff options
author | Carl Lerche <me@carllerche.com> | 2019-08-15 15:04:21 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-08-15 15:04:21 -0700 |
commit | f1f61a3b15d17767b12bfd8c0c5712db1b089b0b (patch) | |
tree | dd9cd51fee4b0b33bcbd079d1e96b4009e8f5fe8 /tokio-net | |
parent | d0a8e5d6f2921fadc51a9612f6fe558e4213560f (diff) |
net: reorganize crate in anticipation of #1264 (#1453)
Space is made to add `tcp`, `udp`, `uds`, ... modules.
Diffstat (limited to 'tokio-net')
-rw-r--r-- | tokio-net/src/driver/mod.rs | 140 | ||||
-rw-r--r-- | tokio-net/src/driver/platform.rs | 28 | ||||
-rw-r--r-- | tokio-net/src/driver/reactor.rs | 532 | ||||
-rw-r--r-- | tokio-net/src/driver/registration.rs (renamed from tokio-net/src/registration.rs) | 6 | ||||
-rw-r--r-- | tokio-net/src/driver/sharded_rwlock.rs (renamed from tokio-net/src/sharded_rwlock.rs) | 0 | ||||
-rw-r--r-- | tokio-net/src/lib.rs | 569 | ||||
-rw-r--r-- | tokio-net/src/util/mod.rs | 5 | ||||
-rw-r--r-- | tokio-net/src/util/poll_evented.rs (renamed from tokio-net/src/poll_evented.rs) | 11 |
8 files changed, 715 insertions, 576 deletions
diff --git a/tokio-net/src/driver/mod.rs b/tokio-net/src/driver/mod.rs new file mode 100644 index 00000000..2ba54d9a --- /dev/null +++ b/tokio-net/src/driver/mod.rs @@ -0,0 +1,140 @@ +//! Event loop that drives Tokio I/O resources. +//! +//! This module contains [`Reactor`], which is the event loop that drives all +//! Tokio I/O resources. It is the reactor's job to receive events from the +//! operating system ([epoll], [kqueue], [IOCP], etc...) and forward them to +//! waiting tasks. It is the bridge between operating system and the futures +//! model. +//! +//! # Overview +//! +//! When using Tokio, all operations are asynchronous and represented by +//! futures. These futures, representing the application logic, are scheduled by +//! an executor (see [runtime model] for more details). Executors wait for +//! notifications before scheduling the future for execution time, i.e., nothing +//! happens until an event is received indicating that the task can make +//! progress. +//! +//! The reactor receives events from the operating system and notifies the +//! executor. +//! +//! Let's start with a basic example, establishing a TCP connection. +//! +//! ``` +//! #![feature(async_await)] +//! +//! use tokio::net::TcpStream; +//! +//! # async fn process<T>(t: T) {} +//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> { +//! let addr = "93.184.216.34:9243".parse()?; +//! +//! let stream = TcpStream::connect(&addr).await?; +//! +//! println!("successfully connected"); +//! +//! process(stream).await; +//! # Ok(()) +//! # } +//! ``` +//! +//! Establishing a TCP connection usually cannot be completed immediately. +//! [`TcpStream::connect`] does not block the current thread. Instead, it +//! returns a [future][connect-future] that resolves once the TCP connection has +//! been established. The connect future itself has no way of knowing when the +//! TCP connection has been established. +//! +//! Before returning the future, [`TcpStream::connect`] registers the socket +//! with a reactor. This registration process, handled by [`Registration`], is +//! what links the [`TcpStream`] with the [`Reactor`] instance. At this point, +//! the reactor starts listening for connection events from the operating system +//! for that socket. +//! +//! Once the connect future is passed to [`tokio::run`], it is spawned onto a +//! thread pool. The thread pool waits until it is notified that the connection +//! has completed. +//! +//! When the TCP connection is established, the reactor receives an event from +//! the operating system. It then notifies the thread pool, telling it that the +//! connect future can complete. At this point, the thread pool will schedule +//! the task to run on one of its worker threads. This results in the `and_then` +//! closure to get executed. +//! +//! ## Lazy registration +//! +//! Notice how the snippet above does not explicitly reference a reactor. When +//! [`TcpStream::connect`] is called, it registers the socket with a reactor, +//! but no reactor is specified. This works because the registration process +//! mentioned above is actually lazy. It doesn't *actually* happen in the +//! [`connect`] function. Instead, the registration is established the first +//! time that the task is polled (again, see [runtime model]). +//! +//! A reactor instance is automatically made available when using the Tokio +//! [runtime], which is done using [`tokio::run`]. The Tokio runtime's executor +//! sets a thread-local variable referencing the associated [`Reactor`] instance +//! and [`Handle::current`] (used by [`Registration`]) returns the reference. +//! +//! ## Implementation +//! +//! The reactor implementation uses [`mio`] to interface with the operating +//! system's event queue. A call to [`Reactor::poll`] results in a single +//! call to [`Poll::poll`] which in turn results in a single call to the +//! operating system's selector. +//! +//! The reactor maintains state for each registered I/O resource. This tracks +//! the executor task to notify when events are provided by the operating +//! system's selector. This state is stored in a `Sync` data structure and +//! referenced by [`Registration`]. When the [`Registration`] instance is +//! dropped, this state is cleaned up. Because the state is stored in a `Sync` +//! data structure, the [`Registration`] instance is able to be moved to other +//! threads. +//! +//! By default, a runtime's default reactor runs on a background thread. This +//! ensures that application code cannot significantly impact the reactor's +//! responsiveness. +//! +//! ## Integrating with the reactor +//! +//! Tokio comes with a number of I/O resources, like TCP and UDP sockets, that +//! automatically integrate with the reactor. However, library authors or +//! applications may wish to implement their own resources that are also backed +//! by the reactor. +//! +//! There are a couple of ways to do this. +//! +//! If the custom I/O resource implements [`mio::Evented`] and implements +//! [`std::io::Read`] and / or [`std::io::Write`], then [`PollEvented`] is the +//! most suited. +//! +//! Otherwise, [`Registration`] can be used directly. This provides the lowest +//! level primitive needed for integrating with the reactor: a stream of +//! readiness events. +//! +//! [`Reactor`]: struct.Reactor.html +//! [`Registration`]: struct.Registration.html +//! [runtime model]: https://tokio.rs/docs/internals/runtime-model/ +//! [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html +//! [kqueue]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2 +//! [IOCP]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx +//! [`TcpStream::connect`]: ../net/struct.TcpStream.html#method.connect +//! [`connect`]: ../net/struct.TcpStream.html#method.connect +//! [connect-future]: ../net/struct.ConnectFuture.html +//! [`tokio::run`]: ../runtime/fn.run.html +//! [`TcpStream`]: ../net/struct.TcpStream.html +//! [runtime]: ../runtime +//! [`Handle::current`]: struct.Handle.html#method.current +//! [`mio`]: https://github.com/carllerche/mio +//! [`Reactor::poll`]: struct.Reactor.html#method.poll +//! [`Poll::poll`]: https://docs.rs/mio/0.6/mio/struct.Poll.html#method.poll +//! [`mio::Evented`]: https://docs.rs/mio/0.6/mio/trait.Evented.html +//! [`PollEvented`]: struct.PollEvented.html +//! [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html +//! [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html + +pub(crate) mod platform; +mod reactor; +mod registration; +mod sharded_rwlock; + +pub use self::reactor::{set_default, Handle, Reactor}; +pub use self::registration::Registration; diff --git a/tokio-net/src/driver/platform.rs b/tokio-net/src/driver/platform.rs new file mode 100644 index 00000000..4cfe7345 --- /dev/null +++ b/tokio-net/src/driver/platform.rs @@ -0,0 +1,28 @@ +pub(crate) use self::sys::*; + +#[cfg(unix)] +mod sys { + use mio::unix::UnixReady; + use mio::Ready; + + pub(crate) fn hup() -> Ready { + UnixReady::hup().into() + } + + pub(crate) fn is_hup(ready: Ready) -> bool { + UnixReady::from(ready).is_hup() + } +} + +#[cfg(windows)] +mod sys { + use mio::Ready; + + pub(crate) fn hup() -> Ready { + Ready::empty() + } + + pub(crate) fn is_hup(_: Ready) -> bool { + false + } +} diff --git a/tokio-net/src/driver/reactor.rs b/tokio-net/src/driver/reactor.rs new file mode 100644 index 00000000..2aa60374 --- /dev/null +++ b/tokio-net/src/driver/reactor.rs @@ -0,0 +1,532 @@ +use super::platform; +use super::sharded_rwlock::RwLock; + +use tokio_executor::park::{Park, Unpark}; +use tokio_sync::AtomicWaker; + +use log::{debug, log_enabled, trace, Level}; +use mio::event::Evented; +use slab::Slab; +use std::cell::RefCell; +use std::io; +use std::marker::PhantomData; +#[cfg(all(unix, not(target_os = "fuchsia")))] +use std::os::unix::io::{AsRawFd, RawFd}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::{Relaxed, SeqCst}; +use std::sync::{Arc, Weak}; +use std::task::Waker; +use std::time::{Duration, Instant}; +use std::{fmt, usize}; + +/// The core reactor, or event loop. +/// +/// The event loop is the main source of blocking in an application which drives +/// all other I/O events and notifications happening. Each event loop can have +/// multiple handles pointing to it, each of which can then be used to create +/// various I/O objects to interact with the event loop in interesting ways. +pub struct Reactor { + /// Reuse the `mio::Events` value across calls to poll. + events: mio::Events, + + /// State shared between the reactor and the handles. + inner: Arc<Inner>, + + _wakeup_registration: mio::Registration, +} + +/// A reference to a reactor. +/// +/// A `Handle` is used for associating I/O objects with an event loop +/// explicitly. Typically though you won't end up using a `Handle` that often +/// and will instead use the default reactor for the execution context. +/// +/// By default, most components bind lazily to reactors. +/// To get this behavior when manually passing a `Handle`, use `default()`. +#[derive(Clone)] +pub struct Handle { + inner: Option<HandlePriv>, +} + +/// Like `Handle`, but never `None`. +#[derive(Clone)] +pub(crate) struct HandlePriv { + inner: Weak<Inner>, +} + +/// Return value from the `turn` method on `Reactor`. +/// +/// Currently this value doesn't actually provide any functionality, but it may +/// in the future give insight into what happened during `turn`. +#[derive(Debug)] +pub struct Turn { + _priv: (), +} + +#[test] +fn test_handle_size() { + use std::mem; + assert_eq!(mem::size_of::<Handle>(), mem::size_of::<HandlePriv>()); +} + +pub(super) struct Inner { + /// The underlying system event queue. + io: mio::Poll, + + /// ABA guard counter + next_aba_guard: AtomicUsize, + + /// Dispatch slabs for I/O and futures events + pub(super) io_dispatch: RwLock<Slab<ScheduledIo>>, + + /// Used to wake up the reactor from a call to `turn` + wakeup: mio::SetReadiness, +} + +pub(super) struct ScheduledIo { + aba_guard: usize, + pub(super) readiness: AtomicUsize, + pub(super) reader: AtomicWaker, + pub(super) writer: AtomicWaker, +} + +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +pub(super) enum Direction { + Read, + Write, +} + +thread_local! { + /// Tracks the reactor for the current execution context. + static CURRENT_REACTOR: RefCell<Option<HandlePriv>> = RefCell::new(None) +} + +const TOKEN_SHIFT: usize = 22; + +// Kind of arbitrary, but this reserves some token space for later usage. +const MAX_SOURCES: usize = (1 << TOKEN_SHIFT) - 1; +const TOKEN_WAKEUP: mio::Token = mio::Token(MAX_SOURCES); + +fn _assert_kinds() { + fn _assert<T: Send + Sync>() {} + + _assert::<Handle>(); +} + +// ===== impl Reactor ===== + +#[derive(Debug)] +///Guard that resets current reactor on drop. +pub struct DefaultGuard<'a> { + _lifetime: PhantomData<&'a u8>, +} + +impl Drop for DefaultGuard<'_> { + fn drop(&mut self) { + CURRENT_REACTOR.with(|current| { + let mut current = current.borrow_mut(); + *current = None; + }); + } +} + +///Sets handle for a default reactor, returning guard that unsets it on drop. +pub fn set_default(handle: &Handle) -> DefaultGuard<'_> { + CURRENT_REACTOR.with(|current| { + let mut current = current.borrow_mut(); + + assert!( + current.is_none(), + "default Tokio reactor already set \ + for execution context" + ); + + let handle = match handle.as_priv() { + Some(handle) => handle, + None => { + panic!("`handle` does not reference a reactor"); + } + }; + + *current = Some(handle.clone()); + }); + + DefaultGuard { + _lifetime: PhantomData, + } +} + +impl Reactor { + /// Creates a new event loop, returning any error that happened during the + /// creation. + pub fn new() -> io::Result<Reactor> { + let io = mio::Poll::new()?; + let wakeup_pair = mio::Registration::new2(); + + io.register( + &wakeup_pair.0, + TOKEN_WAKEUP, + mio::Ready::readable(), + mio::PollOpt::level(), + )?; + + Ok(Reactor { + events: mio::Events::with_capacity(1024), + _wakeup_registration: wakeup_pair.0, + inner: Arc::new(Inner { + io, + next_aba_guard: AtomicUsize::new(0), + io_dispatch: RwLock::new(Slab::with_capacity(1)), + wakeup: wakeup_pair.1, + }), + }) + } + + /// Returns a handle to this event loop which can be sent across threads + /// and can be used as a proxy to the event loop itself. + /// + /// Handles are cloneable and clones always refer to the same event loop. + /// This handle is typically passed into functions that create I/O objects + /// to bind them to this event loop. + pub fn handle(&self) -> Handle { + Handle { + inner: Some(HandlePriv { + inner: Arc::downgrade(&self.inner), + }), + } + } + + /// Performs one iteration of the event loop, blocking on waiting for events + /// for at most `max_wait` (forever if `None`). + /// + /// This method is the primary method of running this reactor and processing + /// I/O events that occur. This method executes one iteration of an event + /// loop, blocking at most once waiting for events to happen. + /// + /// If a `max_wait` is specified then the method should block no longer than + /// the duration specified, but this shouldn't be used as a super-precise + /// timer but rather a "ballpark approximation" + /// + /// # Return value + /// + /// This function returns an instance of `Turn` + /// + /// `Turn` as of today has no extra information with it and can be safely + /// discarded. In the future `Turn` may contain information about what + /// happened while this reactor blocked. + /// + /// # Errors + /// + /// This function may also return any I/O error which occurs when polling + /// for readiness of I/O objects with the OS. This is quite unlikely to + /// arise and typically mean that things have gone horribly wrong at that + /// point. Currently this is primarily only known to happen for internal + /// bugs to `tokio` itself. + pub fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<Turn> { + self.poll(max_wait)?; + Ok(Turn { _priv: () }) + } + + /// Returns true if the reactor is currently idle. + /// + /// Idle is defined as all tasks that have been spawned have completed, + /// either successfully or with an error. + pub fn is_idle(&self) -> bool { + self.inner.io_dispatch.read().is_empty() + } + + fn poll(&mut self, max_wait: Option<Duration>) -> io::Result<()> { + // Block waiting for an event to happen, peeling out how many events + // happened. + match self.inner.io.poll(&mut self.events, max_wait) { + Ok(_) => {} + Err(e) => return Err(e), + } + + let start = if log_enabled!(Level::Debug) { + Some(Instant::now()) + } else { + None + }; + + // Process all the events that came in, dispatching appropriately + let mut events = 0; + for event in self.events.iter() { + events += 1; + let token = event.token(); + trace!("event {:?} {:?}", event.readiness(), event.token()); + + if token == TOKEN_WAKEUP { + self.inner + .wakeup + .set_readiness(mio::Ready::empty()) + .unwrap(); + } else { + self.dispatch(token, event.readiness()); + } + } + + if let Some(start) = start { + let dur = start.elapsed(); + trace!( + "loop process - {} events, {}.{:03}s", + events, + dur.as_secs(), + dur.subsec_millis() + ); + } + + Ok(()) + } + + fn dispatch(&self, token: mio::Token, ready: mio::Ready) { + let aba_guard = token.0 & !MAX_SOURCES; + let token = token.0 & MAX_SOURCES; + + let mut rd = None; + let mut wr = None; + + // Create a scope to ensure that notifying the tasks stays out of the + // lock's critical section. + { + let io_dispatch = self.inner.io_dispatch.read(); + + let io = match io_dispatch.get(token) { + Some(io) => io, + None => return, + }; + + if aba_guard != io.aba_guard { + return; + } + + io.readiness.fetch_or(ready.as_usize(), Relaxed); + + if ready.is_writable() || platform::is_hup(ready) { + wr = io.writer.take_waker(); + } + + if !(ready & (!mio::Ready::writable())).is_empty() { + rd = io.reader.take_waker(); + } + } + + if let Some(w) = rd { + w.wake(); + } + + if let Some(w) = wr { + w.wake(); + } + } +} + +#[cfg(all(unix, not(target_os = "fuchsia")))] +impl AsRawFd for Reactor { + fn as_raw_fd(&self) -> RawFd { + self.inner.io.as_raw_fd() + } +} + +impl Park for Reactor { + type Unpark = Handle; + type Error = io::Error; + + fn unpark(&self) -> Self::Unpark { + self.handle() + } + + fn park(&mut self) -> io::Result<()> { + self.turn(None)?; + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> io::Result<()> { + self.turn(Some(duration))?; + Ok(()) + } +} + +impl fmt::Debug for Reactor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Reactor") + } +} + +// ===== impl Handle ===== + +impl Handle { + #[doc(hidden)] + #[deprecated(note = "semantics were sometimes surprising, use Handle::default()")] + pub fn current() -> Handle { + // TODO: Should this panic on error? + HandlePriv::try_current() + .map(|handle| Handle { + inner: Some(handle), + }) + .unwrap_or(Handle { + inner: Some(HandlePriv { inner: Weak::new() }), + }) + } + + pub(crate) fn as_priv(&self) -> Option<&HandlePriv> { + self.inner.as_ref() + } +} + +impl Unpark for Handle { + fn unpark(&self) { + if let Some(ref h) = self.inner { + h.wakeup(); + } + } +} + +impl Default for Handle { + /// Returns a "default" handle, i.e., a handle that lazily binds to a reactor. + fn default() -> Handle { + Handle { inner: None } + } +} + +impl fmt::Debug for Handle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Handle") + } +} + +// ===== impl HandlePriv ===== + +impl HandlePriv { + /// Try to get a handle to the current reactor. + /// + /// Returns `Err` if no handle is found. + pub(super) fn try_current() -> io::Result<HandlePriv> { + CURRENT_REACTOR.with(|current| match *current.borrow() { + Some(ref handle) => Ok(handle.clone()), + None => Err(io::Error::new(io::ErrorKind::Other, "no current reactor")), + }) + } + + /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise + /// makes the next call to `turn` return immediately. + /// + /// This method is intended to be used in situations where a notification + /// needs to otherwise be sent to the main reactor. If the reactor is + /// currently blocked inside of `turn` then it will wake up and soon return + /// after this method has been called. If the reactor is not currently + /// blocked in `turn`, then the next call to `turn` will not block and + /// return immediately. + fn wakeup(&self) { + if let Some(inner) = self.inner() { + inner.wakeup.set_readiness(mio::Ready::readable()).unwrap(); + } + } + + pub(super) fn inner(&self) -> Option<Arc<Inner>> { + self.inner.upgrade() + } +} + +impl fmt::Debug for HandlePriv { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "HandlePriv") + } +} + +// ===== impl Inner ===== + +impl Inner { + /// Register an I/O resource with the reactor. + /// + /// The registration token is returned. + pub(super) fn add_source(&self, source: &dyn Evented) -> io::Result<usize> { + // Get an ABA guard value + let aba_guard = self.next_aba_guard.fetch_add(1 << TOKEN_SHIFT, Relaxed); + + let key = { + // Block to contain the write lock + let mut io_dispatch = self.io_dispatch.write(); + + if io_dispatch.len() == MAX_SOURCES { + return Err(io::Error::new( + io::ErrorKind::Other, + "reactor at max \ + registered I/O resources", + )); + } + + io_dispatch.insert(ScheduledIo { + aba_guard, + readiness: AtomicUsize::new(0), + reader: AtomicWaker::new(), + writer: AtomicWaker::new(), + }) + }; + + let token = aba_guard | key; + debug!("adding I/O source: {}", token); + + self.io.register( + source, + mio::Token(token), + mio::Ready::all(), + mio::PollOpt::edge(), + )?; + + Ok(key) + } + + /// Deregisters an I/O resource from the reactor. + pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> { + self.io.deregister(source) + } + + pub(super) fn drop_source(&self, token: usize) { + debug!("dropping I/O source: {}", token); + self.io_dispatch.write().remove(token); + } + + /// Registers interest in the I/O resource associated with `token`. + pub(super) fn register(&self, token: usize, dir: Direction, w: Waker) { + debug!("scheduling {:?} for: {}", dir, token); + let io_dispatch = self.io_dispatch.read(); + let sched = io_dispatch.get(token).unwrap(); + + let (waker, ready) = match dir { + Direction::Read => (&sched.reader, !mio::Ready::writable()), + Direction::Write => (&sched.writer, mio::Ready::writable()), + }; + + waker.register(w); + + if sched.readiness.load(SeqCst) & ready.as_usize() != 0 { + waker.wake(); + } + } +} + +impl Drop for Inner { + fn drop(&mut self) { + // When a reactor is dropped it needs to wake up all blocked tasks as + // they'll never receive a notification, and all connected I/O objects + // will start returning errors pretty quickly. + let io = self.io_dispatch.read(); + for (_, io) in io.iter() { + io.writer.wake(); + io.reader.wake(); + } + } +} + +impl Direction { + pub(super) fn mask(self) -> mio::Ready { + match self { + Direction::Read => { + // Everything except writable is signaled through read. + mio::Ready::all() - mio::Ready::writable() + } + Direction::Write => mio::Ready::writable() | platform::hup(), + } + } +} diff --git a/tokio-net/src/registration.rs b/tokio-net/src/driver/registration.rs index d094e218..eb24d8f7 100644 --- a/tokio-net/src/registration.rs +++ b/tokio-net/src/driver/registration.rs @@ -1,4 +1,6 @@ -use crate::{Direction, Handle, HandlePriv}; +use super::platform; +use super::reactor::{Direction, Handle, HandlePriv}; + use log::debug; use mio::{self, Evented}; use std::cell::UnsafeCell; @@ -504,7 +506,7 @@ impl Inner { }; let mask = direction.mask(); - let mask_no_hup = (mask - crate::platform::hup()).as_usize(); + let mask_no_hup = (mask - platform::hup()).as_usize(); let io_dispatch = inner.io_dispatch.read(); let sched = &io_dispatch[self.token]; diff --git a/tokio-net/src/sharded_rwlock.rs b/tokio-net/src/driver/sharded_rwlock.rs index 67892481..67892481 100644 --- a/tokio-net/src/sharded_rwlock.rs +++ b/tokio-net/src/driver/sharded_rwlock.rs diff --git a/tokio-net/src/lib.rs b/tokio-net/src/lib.rs index 09a9d5e7..c1393300 100644 --- a/tokio-net/src/lib.rs +++ b/tokio-net/src/lib.rs @@ -36,570 +36,5 @@ //! [`PollEvented`]: struct.PollEvented.html //! [reactor module]: https://docs.rs/tokio/0.1/tokio/reactor/index.html -mod poll_evented; -mod registration; -mod sharded_rwlock; - -// ===== Public re-exports ===== - -pub use self::poll_evented::PollEvented; -pub use self::registration::Registration; - -// ===== Private imports ===== - -use crate::sharded_rwlock::RwLock; -use log::{debug, log_enabled, trace, Level}; -use mio::event::Evented; -use slab::Slab; -use std::cell::RefCell; -use std::io; -use std::marker::PhantomData; -#[cfg(all(unix, not(target_os = "fuchsia")))] -use std::os::unix::io::{AsRawFd, RawFd}; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::{Relaxed, SeqCst}; -use std::sync::{Arc, Weak}; -use std::task::Waker; -use std::time::{Duration, Instant}; -use std::{fmt, usize}; -use tokio_executor::park::{Park, Unpark}; -use tokio_sync::AtomicWaker; - -/// The core reactor, or event loop. -/// -/// The event loop is the main source of blocking in an application which drives -/// all other I/O events and notifications happening. Each event loop can have -/// multiple handles pointing to it, each of which can then be used to create -/// various I/O objects to interact with the event loop in interesting ways. -pub struct Reactor { - /// Reuse the `mio::Events` value across calls to poll. - events: mio::Events, - - /// State shared between the reactor and the handles. - inner: Arc<Inner>, - - _wakeup_registration: mio::Registration, -} - -/// A reference to a reactor. -/// -/// A `Handle` is used for associating I/O objects with an event loop -/// explicitly. Typically though you won't end up using a `Handle` that often -/// and will instead use the default reactor for the execution context. -/// -/// By default, most components bind lazily to reactors. -/// To get this behavior when manually passing a `Handle`, use `default()`. -#[derive(Clone)] -pub struct Handle { - inner: Option<HandlePriv>, -} - -/// Like `Handle`, but never `None`. -#[derive(Clone)] -struct HandlePriv { - inner: Weak<Inner>, -} - -/// Return value from the `turn` method on `Reactor`. -/// -/// Currently this value doesn't actually provide any functionality, but it may -/// in the future give insight into what happened during `turn`. -#[derive(Debug)] -pub struct Turn { - _priv: (), -} - -#[test] -fn test_handle_size() { - use std::mem; - assert_eq!(mem::size_of::<Handle>(), mem::size_of::<HandlePriv>()); -} - -struct Inner { - /// The underlying system event queue. - io: mio::Poll, - - /// ABA guard counter - next_aba_guard: AtomicUsize, - - /// Dispatch slabs for I/O and futures events - io_dispatch: RwLock<Slab<ScheduledIo>>, - - /// Used to wake up the reactor from a call to `turn` - wakeup: mio::SetReadiness, -} - -struct ScheduledIo { - aba_guard: usize, - readiness: AtomicUsize, - reader: AtomicWaker, - writer: AtomicWaker, -} - -#[derive(Debug, Eq, PartialEq, Clone, Copy)] -pub(crate) enum Direction { - Read, - Write, -} - -thread_local! { - /// Tracks the reactor for the current execution context. - static CURRENT_REACTOR: RefCell<Option<HandlePriv>> = RefCell::new(None) -} - -const TOKEN_SHIFT: usize = 22; - -// Kind of arbitrary, but this reserves some token space for later usage. -const MAX_SOURCES: usize = (1 << TOKEN_SHIFT) - 1; -const TOKEN_WAKEUP: mio::Token = mio::Token(MAX_SOURCES); - -fn _assert_kinds() { - fn _assert<T: Send + Sync>() {} - - _assert::<Handle>(); -} - -// ===== impl Reactor ===== - -#[derive(Debug)] -///Guard that resets current reactor on drop. -pub struct DefaultGuard<'a> { - _lifetime: PhantomData<&'a u8>, -} - -impl Drop for DefaultGuard<'_> { - fn drop(&mut self) { - CURRENT_REACTOR.with(|current| { - let mut current = current.borrow_mut(); - *current = None; - }); - } -} - -///Sets handle for a default reactor, returning guard that unsets it on drop. -pub fn set_default(handle: &Handle) -> DefaultGuard<'_> { - CURRENT_REACTOR.with(|current| { - let mut current = current.borrow_mut(); - - assert!( - current.is_none(), - "default Tokio reactor already set \ - for execution context" - ); - - let handle = match handle.as_priv() { - Some(handle) => handle, - None => { - panic!("`handle` does not reference a reactor"); - } - }; - - *current = Some(handle.clone()); - }); - - DefaultGuard { - _lifetime: PhantomData, - } -} - -impl Reactor { - /// Creates a new event loop, returning any error that happened during the - /// creation. - pub fn new() -> io::Result<Reactor> { - let io = mio::Poll::new()?; - let wakeup_pair = mio::Registration::new2(); - - io.register( - &wakeup_pair.0, - TOKEN_WAKEUP, - mio::Ready::readable(), - mio::PollOpt::level(), - )?; - - Ok(Reactor { - events: mio::Events::with_capacity(1024), - _wakeup_registration: wakeup_pair.0, - inner: Arc::new(Inner { - io, - next_aba_guard: AtomicUsize::new(0), - io_dispatch: RwLock::new(Slab::with_capacity(1)), - wakeup: wakeup_pair.1, - }), - }) - } - - /// Returns a handle to this event loop which can be sent across threads - /// and can be used as a proxy to the event loop itself. - /// - /// Handles are cloneable and clones always refer to the same event loop. - /// This handle is typically passed into functions that create I/O objects - /// to bind them to this event loop. - pub fn handle(&self) -> Handle { - Handle { - inner: Some(HandlePriv { - inner: Arc::downgrade(&self.inner), - }), - } - } - - /// Performs one iteration of the event loop, blocking on waiting for events - /// for at most `max_wait` (forever if `None`). - /// - /// This method is the primary method of running this reactor and processing - /// I/O events that occur. This method executes one iteration of an event - /// loop, blocking at most once waiting for events to happen. - /// - /// If a `max_wait` is specified then the method should block no longer than - /// the duration specified, but this shouldn't be used as a super-precise - /// timer but rather a "ballpark approximation" - /// - /// # Return value - /// - /// This function returns an instance of `Turn` - /// - /// `Turn` as of today has no extra information with it and can be safely - /// discarded. In the future `Turn` may contain information about what - /// happened while this reactor blocked. - /// - /// # Errors - /// - /// This function may also return any I/O error which occurs when polling - /// for readiness of I/O objects with the OS. This is quite unlikely to - /// arise and typically mean that things have gone horribly wrong at that - /// point. Currently this is primarily only known to happen for internal - /// bugs to `tokio` itself. - pub fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<Turn> { - self.poll(max_wait)?; - Ok(Turn { _priv: () }) - } - - /// Returns true if the reactor is currently idle. - /// - /// Idle is defined as all tasks that have been spawned have completed, - /// either successfully or with an error. - pub fn is_idle(&self) -> bool { - self.inner.io_dispatch.read().is_empty() - } - - fn poll(&mut self, max_wait: Option<Duration>) -> io::Result<()> { - // Block waiting for an event to happen, peeling out how many events - // happened. - match self.inner.io.poll(&mut self.events, max_wait) { - Ok(_) => {} - Err(e) => return Err(e), - } - - let start = if log_enabled!(Level::Debug) { - Some(Instant::now()) - } else { |