diff options
author | Carl Lerche <me@carllerche.com> | 2019-10-25 12:50:15 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-25 12:50:15 -0700 |
commit | 227533d456fe32e48ffcd3796f1e6c8f9318b230 (patch) | |
tree | 498029aaf42dd64eeb8ef0e7d7f29802b45d4e95 /tokio/src | |
parent | 03a9378297c73c2e56a6d6b55db22b92427b850a (diff) |
net: move into tokio crate (#1683)
A step towards collapsing Tokio sub crates into a single `tokio`
crate (#1318).
The `net` implementation is now provided by the main `tokio` crate.
Functionality can be opted out of by using the various net related
feature flags.
Diffstat (limited to 'tokio/src')
44 files changed, 7832 insertions, 82 deletions
diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index b8682c52..814f2a59 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -26,7 +26,7 @@ //! //! Guide level documentation is found on the [website]. //! -//! [driver]: tokio_net::driver +//! [driver]: tokio::net::driver //! [website]: https://tokio.rs/docs/ //! //! # Examples @@ -82,23 +82,34 @@ macro_rules! if_runtime { #[cfg(feature = "timer")] pub mod clock; + #[cfg(feature = "codec")] pub mod codec; + #[cfg(feature = "fs")] pub mod fs; + pub mod future; + #[cfg(feature = "io")] pub mod io; -#[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))] + +#[cfg(feature = "net-driver")] pub mod net; + pub mod prelude; + #[cfg(feature = "process")] pub mod process; + #[cfg(feature = "signal")] pub mod signal; + pub mod stream; + #[cfg(feature = "sync")] pub mod sync; + #[cfg(feature = "timer")] pub mod timer; @@ -113,6 +124,7 @@ if_runtime! { #[cfg(feature = "macros")] #[doc(inline)] pub use tokio_macros::main; + #[cfg(feature = "macros")] #[doc(inline)] pub use tokio_macros::test; diff --git a/tokio/src/net.rs b/tokio/src/net.rs deleted file mode 100644 index 6aa1e499..00000000 --- a/tokio/src/net.rs +++ /dev/null @@ -1,67 +0,0 @@ -//! TCP/UDP/Unix bindings for `tokio`. -//! -//! This module contains the TCP/UDP/Unix networking types, similar to the standard -//! library, which can be used to implement networking protocols. -//! -//! # Organization -//! -//! * [`TcpListener`] and [`TcpStream`] provide functionality for communication over TCP -//! * [`UdpSocket`] provides functionality for communication over UDP -//! * [`UnixListener`] and [`UnixStream`] provide functionality for communication over a -//! Unix Domain Stream Socket **(available on Unix only)** -//! * [`UnixDatagram`] and [`UnixDatagramFramed`] provide functionality for communication -//! over Unix Domain Datagram Socket **(available on Unix only)** - -//! -//! [`TcpListener`]: struct.TcpListener.html -//! [`TcpStream`]: struct.TcpStream.html -//! [`UdpSocket`]: struct.UdpSocket.html -//! [`UnixListener`]: struct.UnixListener.html -//! [`UnixStream`]: struct.UnixStream.html -//! [`UnixDatagram`]: struct.UnixDatagram.html -//! [`UnixDatagramFramed`]: struct.UnixDatagramFramed.html - -#[cfg(feature = "tcp")] -pub mod tcp { - //! TCP bindings for `tokio`. - //! - //! Connecting to an address, via TCP, can be done using [`TcpStream`]'s - //! [`connect`] method, which returns [`ConnectFuture`]. `ConnectFuture` - //! implements a future which returns a `TcpStream`. - //! - //! To listen on an address [`TcpListener`] can be used. `TcpListener`'s - //! [`incoming`][incoming_method] method can be used to accept new connections. - //! It return the [`Incoming`] struct, which implements a stream which returns - //! `TcpStream`s. - //! - //! [`TcpStream`]: struct.TcpStream.html - //! [`connect`]: struct.TcpStream.html#method.connect - //! [`ConnectFuture`]: struct.ConnectFuture.html - //! [`TcpListener`]: struct.TcpListener.html - //! [incoming_method]: struct.TcpListener.html#method.incoming - //! [`Incoming`]: struct.Incoming.html - pub use tokio_net::tcp::{split, Incoming, TcpListener, TcpStream}; -} -#[cfg(feature = "tcp")] -pub use self::tcp::{TcpListener, TcpStream}; - -#[cfg(feature = "udp")] -pub mod udp { - //! UDP bindings for `tokio`. - //! - //! The main struct for UDP is the [`UdpSocket`], which represents a UDP socket. - //! - //! [`UdpSocket`]: struct.UdpSocket.html - pub use tokio_net::udp::{split, UdpSocket}; -} -#[cfg(feature = "udp")] -pub use self::udp::UdpSocket; - -#[cfg(all(unix, feature = "uds"))] -pub mod unix { - //! Unix domain socket bindings for `tokio` (only available on unix systems). - - pub use tokio_net::uds::{split, UCred, UnixDatagram, UnixListener, UnixStream}; -} -#[cfg(all(unix, feature = "uds"))] -pub use self::unix::{UnixDatagram, UnixListener, UnixStream}; diff --git a/tokio/src/net/addr.rs b/tokio/src/net/addr.rs new file mode 100644 index 00000000..8b782cfa --- /dev/null +++ b/tokio/src/net/addr.rs @@ -0,0 +1,214 @@ +use tokio_executor::blocking; + +use futures_util::future; +use std::io; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + +/// Convert or resolve without blocking to one or more `SocketAddr` values. +/// +/// Currently, this trait is only used as an argument to Tokio functions that +/// need to reference a target socket address. +/// +/// This trait is sealed and is intended to be opaque. Users of Tokio should +/// only use `ToSocketAddrs` in trait bounds and __must not__ attempt to call +/// the functions directly or reference associated types. Changing these is not +/// considered a breaking change. +pub trait ToSocketAddrs: sealed::ToSocketAddrsPriv {} + +type ReadyFuture<T> = future::Ready<io::Result<T>>; + +// ===== impl SocketAddr ===== + +impl ToSocketAddrs for SocketAddr {} + +impl sealed::ToSocketAddrsPriv for SocketAddr { + type Iter = std::option::IntoIter<SocketAddr>; + type Future = ReadyFuture<Self::Iter>; + + fn to_socket_addrs(&self) -> Self::Future { + let iter = Some(*self).into_iter(); + future::ready(Ok(iter)) + } +} + +// ===== impl str ===== + +impl ToSocketAddrs for str {} + +impl sealed::ToSocketAddrsPriv for str { + type Iter = sealed::OneOrMore; + type Future = sealed::MaybeReady; + + fn to_socket_addrs(&self) -> Self::Future { + use sealed::MaybeReady; + + // First check if the input parses as a socket address + let res: Result<SocketAddr, _> = self.parse(); + + if let Ok(addr) = res { + return MaybeReady::Ready(Some(addr)); + } + + // Run DNS lookup on the blocking pool + let s = self.to_owned(); + + MaybeReady::Blocking(blocking::run(move || { + std::net::ToSocketAddrs::to_socket_addrs(&s) + })) + } +} + +// ===== impl (&str, u16) ===== + +impl ToSocketAddrs for (&'_ str, u16) {} + +impl sealed::ToSocketAddrsPriv for (&'_ str, u16) { + type Iter = sealed::OneOrMore; + type Future = sealed::MaybeReady; + + fn to_socket_addrs(&self) -> Self::Future { + use sealed::MaybeReady; + use std::net::{SocketAddrV4, SocketAddrV6}; + + let (host, port) = *self; + + // try to parse the host as a regular IP address first + if let Ok(addr) = host.parse::<Ipv4Addr>() { + let addr = SocketAddrV4::new(addr, port); + let addr = SocketAddr::V4(addr); + + return MaybeReady::Ready(Some(addr)); + } + + if let Ok(addr) = host.parse::<Ipv6Addr>() { + let addr = SocketAddrV6::new(addr, port, 0, 0); + let addr = SocketAddr::V6(addr); + + return MaybeReady::Ready(Some(addr)); + } + + let host = host.to_owned(); + + MaybeReady::Blocking(blocking::run(move || { + std::net::ToSocketAddrs::to_socket_addrs(&(&host[..], port)) + })) + } +} + +// ===== impl (IpAddr, u16) ===== + +impl ToSocketAddrs for (IpAddr, u16) {} + +impl sealed::ToSocketAddrsPriv for (IpAddr, u16) { + type Iter = std::option::IntoIter<SocketAddr>; + type Future = ReadyFuture<Self::Iter>; + + fn to_socket_addrs(&self) -> Self::Future { + let iter = Some(SocketAddr::from(*self)).into_iter(); + future::ready(Ok(iter)) + } +} + +// ===== impl String ===== + +impl ToSocketAddrs for String {} + +impl sealed::ToSocketAddrsPriv for String { + type Iter = <str as sealed::ToSocketAddrsPriv>::Iter; + type Future = <str as sealed::ToSocketAddrsPriv>::Future; + + fn to_socket_addrs(&self) -> Self::Future { + (&self[..]).to_socket_addrs() + } +} + +// ===== impl &'_ impl ToSocketAddrs ===== + +impl<T: ToSocketAddrs + ?Sized> ToSocketAddrs for &'_ T {} + +impl<T> sealed::ToSocketAddrsPriv for &'_ T +where + T: sealed::ToSocketAddrsPriv + ?Sized, +{ + type Iter = T::Iter; + type Future = T::Future; + + fn to_socket_addrs(&self) -> Self::Future { + (**self).to_socket_addrs() + } +} + +pub(crate) mod sealed { + //! The contents of this trait are intended to remain private and __not__ + //! part of the `ToSocketAddrs` public API. The details will change over + //! time. + + use tokio_executor::blocking::Blocking; + + use futures_core::ready; + use std::future::Future; + use std::io; + use std::net::SocketAddr; + use std::option; + use std::pin::Pin; + use std::task::{Context, Poll}; + use std::vec; + + #[doc(hidden)] + pub trait ToSocketAddrsPriv { + type Iter: Iterator<Item = SocketAddr> + Send + 'static; + type Future: Future<Output = io::Result<Self::Iter>> + Send + 'static; + + fn to_socket_addrs(&self) -> Self::Future; + } + + #[doc(hidden)] + #[derive(Debug)] + pub enum MaybeReady { + Ready(Option<SocketAddr>), + Blocking(Blocking<io::Result<vec::IntoIter<SocketAddr>>>), + } + + #[doc(hidden)] + #[derive(Debug)] + pub enum OneOrMore { + One(option::IntoIter<SocketAddr>), + More(vec::IntoIter<SocketAddr>), + } + + impl Future for MaybeReady { + type Output = io::Result<OneOrMore>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + match *self { + MaybeReady::Ready(ref mut i) => { + let iter = OneOrMore::One(i.take().into_iter()); + Poll::Ready(Ok(iter)) + } + MaybeReady::Blocking(ref mut rx) => { + let res = ready!(Pin::new(rx).poll(cx)).map(OneOrMore::More); + + Poll::Ready(res) + } + } + } + } + + impl Iterator for OneOrMore { + type Item = SocketAddr; + + fn next(&mut self) -> Option<Self::Item> { + match self { + OneOrMore::One(i) => i.next(), + OneOrMore::More(i) => i.next(), + } + } + + fn size_hint(&self) -> (usize, Option<usize>) { + match self { + OneOrMore::One(i) => i.size_hint(), + OneOrMore::More(i) => i.size_hint(), + } + } + } +} diff --git a/tokio/src/net/driver/mod.rs b/tokio/src/net/driver/mod.rs new file mode 100644 index 00000000..9079ccc7 --- /dev/null +++ b/tokio/src/net/driver/mod.rs @@ -0,0 +1,133 @@ +//! Event loop that drives Tokio I/O resources. +//! +//! This module contains [`Reactor`], which is the event loop that drives all +//! Tokio I/O resources. It is the reactor's job to receive events from the +//! operating system ([epoll], [kqueue], [IOCP], etc...) and forward them to +//! waiting tasks. It is the bridge between operating system and the futures +//! model. +//! +//! # Overview +//! +//! When using Tokio, all operations are asynchronous and represented by +//! futures. These futures, representing the application logic, are scheduled by +//! an executor (see [runtime model] for more details). Executors wait for +//! notifications before scheduling the future for execution time, i.e., nothing +//! happens until an event is received indicating that the task can make +//! progress. +//! +//! The reactor receives events from the operating system and notifies the +//! executor. +//! +//! Let's start with a basic example, establishing a TCP connection. +//! +//! ``` +//! use tokio::net::TcpStream; +//! +//! # async fn process<T>(_t: T) {} +//! +//! # #[tokio::main] +//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> { +//! let stream = TcpStream::connect("93.184.216.34:9243").await?; +//! +//! println!("successfully connected"); +//! +//! process(stream).await; +//! # Ok(()) +//! # } +//! ``` +//! +//! Establishing a TCP connection usually cannot be completed immediately. +//! [`TcpStream::connect`] does not block the current thread. Instead, it +//! returns a [future][connect-future] that resolves once the TCP connection has +//! been established. The connect future itself has no way of knowing when the +//! TCP connection has been established. +//! +//! Before returning the future, [`TcpStream::connect`] registers the socket +//! with a reactor. This registration process, handled by [`Registration`], is +//! what links the [`TcpStream`] with the [`Reactor`] instance. At this point, +//! the reactor starts listening for connection events from the operating system +//! for that socket. +//! +//! Once the connect future is passed to [`tokio::run`], it is spawned onto a +//! thread pool. The thread pool waits until it is notified that the connection +//! has completed. +//! +//! When the TCP connection is established, the reactor receives an event from +//! the operating system. It then notifies the thread pool, telling it that the +//! connect future can complete. At this point, the thread pool will schedule +//! the task to run on one of its worker threads. This results in the `and_then` +//! closure to get executed. +//! +//! ## Eager registration +//! +//! Notice how the snippet does not explicitly reference a reactor. When +//! [`TcpStream::connect`] is called, it registers the socket with the current +//! reactor, but no reactor is specified. This works because a reactor +//! instance is automatically made available when using the Tokio [runtime], +//! which is done using [`tokio::main`]. The Tokio runtime's executor sets a +//! thread-local variable referencing the associated [`Reactor`] instance and +//! [`Handle::current`] (used by [`Registration`]) returns the reference. +//! +//! ## Implementation +//! +//! The reactor implementation uses [`mio`] to interface with the operating +//! system's event queue. A call to [`Reactor::poll`] results in a single +//! call to [`Poll::poll`] which in turn results in a single call to the +//! operating system's selector. +//! +//! The reactor maintains state for each registered I/O resource. This tracks +//! the executor task to notify when events are provided by the operating +//! system's selector. This state is stored in a `Sync` data structure and +//! referenced by [`Registration`]. When the [`Registration`] instance is +//! dropped, this state is cleaned up. Because the state is stored in a `Sync` +//! data structure, the [`Registration`] instance is able to be moved to other +//! threads. +//! +//! By default, a runtime's default reactor runs on a background thread. This +//! ensures that application code cannot significantly impact the reactor's +//! responsiveness. +//! +//! ## Integrating with the reactor +//! +//! Tokio comes with a number of I/O resources, like TCP and UDP sockets, that +//! automatically integrate with the reactor. However, library authors or +//! applications may wish to implement their own resources that are also backed +//! by the reactor. +//! +//! There are a couple of ways to do this. +//! +//! If the custom I/O resource implements [`mio::Evented`] and implements +//! [`std::io::Read`] and / or [`std::io::Write`], then [`PollEvented`] is the +//! most suited. +//! +//! Otherwise, [`Registration`] can be used directly. This provides the lowest +//! level primitive needed for integrating with the reactor: a stream of +//! readiness events. +//! +//! [`Reactor`]: struct.Reactor.html +//! [`Registration`]: struct.Registration.html +//! [runtime model]: https://tokio.rs/docs/internals/runtime-model/ +//! [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html +//! [kqueue]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2 +//! [IOCP]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx +//! [`TcpStream::connect`]: ../net/struct.TcpStream.html#method.connect +//! [`connect`]: ../net/struct.TcpStream.html#method.connect +//! [connect-future]: ../net/struct.ConnectFuture.html +//! [`tokio::run`]: ../runtime/fn.run.html +//! [`TcpStream`]: ../net/struct.TcpStream.html +//! [runtime]: ../runtime +//! [`Handle::current`]: struct.Handle.html#method.current +//! [`mio`]: https://github.com/carllerche/mio +//! [`Reactor::poll`]: struct.Reactor.html#method.poll +//! [`Poll::poll`]: https://docs.rs/mio/0.6/mio/struct.Poll.html#method.poll +//! [`mio::Evented`]: https://docs.rs/mio/0.6/mio/trait.Evented.html +//! [`PollEvented`]: struct.PollEvented.html +//! [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html +//! [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html + +pub(crate) mod platform; +mod reactor; +mod registration; + +pub use self::reactor::{set_default, DefaultGuard, Handle, Reactor}; +pub use self::registration::Registration; diff --git a/tokio/src/net/driver/platform.rs b/tokio/src/net/driver/platform.rs new file mode 100644 index 00000000..4cfe7345 --- /dev/null +++ b/tokio/src/net/driver/platform.rs @@ -0,0 +1,28 @@ +pub(crate) use self::sys::*; + +#[cfg(unix)] +mod sys { + use mio::unix::UnixReady; + use mio::Ready; + + pub(crate) fn hup() -> Ready { + UnixReady::hup().into() + } + + pub(crate) fn is_hup(ready: Ready) -> bool { + UnixReady::from(ready).is_hup() + } +} + +#[cfg(windows)] +mod sys { + use mio::Ready; + + pub(crate) fn hup() -> Ready { + Ready::empty() + } + + pub(crate) fn is_hup(_: Ready) -> bool { + false + } +} diff --git a/tokio/src/net/driver/reactor.rs b/tokio/src/net/driver/reactor.rs new file mode 100644 index 00000000..384abe47 --- /dev/null +++ b/tokio/src/net/driver/reactor.rs @@ -0,0 +1,451 @@ +use super::platform; + +use tokio_executor::park::{Park, Unpark}; +use tokio_sync::AtomicWaker; + +use mio::event::Evented; +use slab::Slab; +use std::cell::RefCell; +use std::io; +use std::marker::PhantomData; +#[cfg(all(unix, not(target_os = "fuchsia")))] +use std::os::unix::io::{AsRawFd, RawFd}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::{Relaxed, SeqCst}; +use std::sync::{Arc, RwLock, Weak}; +use std::task::Waker; +use std::time::Duration; +use std::{fmt, usize}; + +/// The core reactor, or event loop. +/// +/// The event loop is the main source of blocking in an application which drives +/// all other I/O events and notifications happening. Each event loop can have +/// multiple handles pointing to it, each of which can then be used to create +/// various I/O objects to interact with the event loop in interesting ways. +pub struct Reactor { + /// Reuse the `mio::Events` value across calls to poll. + events: mio::Events, + + /// State shared between the reactor and the handles. + inner: Arc<Inner>, + + _wakeup_registration: mio::Registration, +} + +/// A reference to a reactor. +/// +/// A `Handle` is used for associating I/O objects with an event loop +/// explicitly. Typically though you won't end up using a `Handle` that often +/// and will instead use the default reactor for the execution context. +#[derive(Clone)] +pub struct Handle { + inner: Weak<Inner>, +} + +/// Return value from the `turn` method on `Reactor`. +/// +/// Currently this value doesn't actually provide any functionality, but it may +/// in the future give insight into what happened during `turn`. +#[derive(Debug)] +pub struct Turn { + _priv: (), +} + +pub(super) struct Inner { + /// The underlying system event queue. + io: mio::Poll, + + /// ABA guard counter + next_aba_guard: AtomicUsize, + + /// Dispatch slabs for I/O and futures events + pub(super) io_dispatch: RwLock<Slab<ScheduledIo>>, + + /// Used to wake up the reactor from a call to `turn` + wakeup: mio::SetReadiness, +} + +pub(super) struct ScheduledIo { + aba_guard: usize, + pub(super) readiness: AtomicUsize, + pub(super) reader: AtomicWaker, + pub(super) writer: AtomicWaker, +} + +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +pub(super) enum Direction { + Read, + Write, +} + +thread_local! { + /// Tracks the reactor for the current execution context. + static CURRENT_REACTOR: RefCell<Option<Handle>> = RefCell::new(None) +} + +const TOKEN_SHIFT: usize = 22; + +// Kind of arbitrary, but this reserves some token space for later usage. +const MAX_SOURCES: usize = (1 << TOKEN_SHIFT) - 1; +const TOKEN_WAKEUP: mio::Token = mio::Token(MAX_SOURCES); + +fn _assert_kinds() { + fn _assert<T: Send + Sync>() {} + + _assert::<Handle>(); +} + +// ===== impl Reactor ===== + +#[derive(Debug)] +/// Guard that resets current reactor on drop. +pub struct DefaultGuard<'a> { + _lifetime: PhantomData<&'a u8>, +} + +impl Drop for DefaultGuard<'_> { + fn drop(&mut self) { + CURRENT_REACTOR.with(|current| { + let mut current = current.borrow_mut(); + *current = None; + }); + } +} + +/// Sets handle for a default reactor, returning guard that unsets it on drop. +pub fn set_default(handle: &Handle) -> DefaultGuard<'_> { + CURRENT_REACTOR.with(|current| { + let mut current = current.borrow_mut(); + + assert!( + current.is_none(), + "default Tokio reactor already set \ + for execution context" + ); + + *current = Some(handle.clone()); + }); + + DefaultGuard { + _lifetime: PhantomData, + } +} + +impl Reactor { + /// Creates a new event loop, returning any error that happened during the + /// creation. + pub fn new() -> io::Result<Reactor> { + let io = mio::Poll::new()?; + let wakeup_pair = mio::Registration::new2(); + + io.register( + &wakeup_pair.0, + TOKEN_WAKEUP, + mio::Ready::readable(), + mio::PollOpt::level(), + )?; + + Ok(Reactor { + events: mio::Events::with_capacity(1024), + _wakeup_registration: wakeup_pair.0, + inner: Arc::new(Inner { + io, + next_aba_guard: AtomicUsize::new(0), + io_dispatch: RwLock::new(Slab::with_capacity(1)), + wakeup: wakeup_pair.1, + }), + }) + } + + /// Returns a handle to this event loop which can be sent across threads + /// and can be used as a proxy to the event loop itself. + /// + /// Handles are cloneable and clones always refer to the same event loop. + /// This handle is typically passed into functions that create I/O objects + /// to bind them to this event loop. + pub fn handle(&self) -> Handle { + Handle { + inner: Arc::downgrade(&self.inner), + } + } + + /// Performs one iteration of the event loop, blocking on waiting for events + /// for at most `max_wait` (forever if `None`). + /// + /// This method is the primary method of running this reactor and processing + /// I/O events that occur. This method executes one iteration of an event + /// loop, blocking at most once waiting for events to happen. + /// + /// If a `max_wait` is specified then the method should block no longer than + /// the duration specified, but this shouldn't be used as a super-precise + /// timer but rather a "ballpark approximation" + /// + /// # Return value + /// + /// This function returns an instance of `Turn` + /// + /// `Turn` as of today has no extra information with it and can be safely + /// discarded. In the future `Turn` may contain information about what + /// happened while this reactor blocked. + /// + /// # Errors + /// + /// This function may also return any I/O error which occurs when polling + /// for readiness of I/O objects with the OS. This is quite unlikely to + /// arise and typically mean that things have gone horribly wrong at that + /// point. Currently this is primarily only known to happen for internal + /// bugs to `tokio` itself. + pub fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<Turn> { + self.poll(max_wait)?; + Ok(Turn { _priv: () }) + } + + /// Returns true if the reactor is currently idle. + /// + /// Idle is defined as all tasks that have been spawned have completed, + /// either successfully or with an error. + pub fn is_idle(&self) -> bool { + |