summaryrefslogtreecommitdiffstats
path: root/tokio/src
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-10-25 12:50:15 -0700
committerGitHub <noreply@github.com>2019-10-25 12:50:15 -0700
commit227533d456fe32e48ffcd3796f1e6c8f9318b230 (patch)
tree498029aaf42dd64eeb8ef0e7d7f29802b45d4e95 /tokio/src
parent03a9378297c73c2e56a6d6b55db22b92427b850a (diff)
net: move into tokio crate (#1683)
A step towards collapsing Tokio sub crates into a single `tokio` crate (#1318). The `net` implementation is now provided by the main `tokio` crate. Functionality can be opted out of by using the various net related feature flags.
Diffstat (limited to 'tokio/src')
-rw-r--r--tokio/src/lib.rs16
-rw-r--r--tokio/src/net.rs67
-rw-r--r--tokio/src/net/addr.rs214
-rw-r--r--tokio/src/net/driver/mod.rs133
-rw-r--r--tokio/src/net/driver/platform.rs28
-rw-r--r--tokio/src/net/driver/reactor.rs451
-rw-r--r--tokio/src/net/driver/registration.rs269
-rw-r--r--tokio/src/net/driver/sharded_rwlock.rs217
-rw-r--r--tokio/src/net/mod.rs47
-rw-r--r--tokio/src/net/tcp/incoming.rs31
-rw-r--r--tokio/src/net/tcp/listener.rs358
-rw-r--r--tokio/src/net/tcp/mod.rs29
-rw-r--r--tokio/src/net/tcp/split.rs97
-rw-r--r--tokio/src/net/tcp/stream.rs827
-rw-r--r--tokio/src/net/udp/mod.rs13
-rw-r--r--tokio/src/net/udp/socket.rs419
-rw-r--r--tokio/src/net/udp/split.rs148
-rw-r--r--tokio/src/net/unix/datagram.rs233
-rw-r--r--tokio/src/net/unix/incoming.rs31
-rw-r--r--tokio/src/net/unix/listener.rs135
-rw-r--r--tokio/src/net/unix/mod.rs21
-rw-r--r--tokio/src/net/unix/split.rs91
-rw-r--r--tokio/src/net/unix/stream.rs339
-rw-r--r--tokio/src/net/unix/ucred.rs154
-rw-r--r--tokio/src/net/util/mod.rs4
-rw-r--r--tokio/src/net/util/poll_evented.rs415
-rw-r--r--tokio/src/process.rs2
-rw-r--r--tokio/src/process/kill.rs13
-rw-r--r--tokio/src/process/mod.rs1060
-rw-r--r--tokio/src/process/unix/mod.rs225
-rw-r--r--tokio/src/process/unix/orphan.rs190
-rw-r--r--tokio/src/process/unix/reap.rs334
-rw-r--r--tokio/src/process/windows.rs193
-rw-r--r--tokio/src/runtime/current_thread/builder.rs2
-rw-r--r--tokio/src/runtime/current_thread/runtime.rs2
-rw-r--r--tokio/src/runtime/mod.rs2
-rw-r--r--tokio/src/runtime/threadpool/builder.rs4
-rw-r--r--tokio/src/runtime/threadpool/mod.rs4
-rw-r--r--tokio/src/signal.rs4
-rw-r--r--tokio/src/signal/ctrl_c.rs46
-rw-r--r--tokio/src/signal/mod.rs93
-rw-r--r--tokio/src/signal/registry.rs310
-rw-r--r--tokio/src/signal/unix.rs426
-rw-r--r--tokio/src/signal/windows.rs217
44 files changed, 7832 insertions, 82 deletions
diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs
index b8682c52..814f2a59 100644
--- a/tokio/src/lib.rs
+++ b/tokio/src/lib.rs
@@ -26,7 +26,7 @@
//!
//! Guide level documentation is found on the [website].
//!
-//! [driver]: tokio_net::driver
+//! [driver]: tokio::net::driver
//! [website]: https://tokio.rs/docs/
//!
//! # Examples
@@ -82,23 +82,34 @@ macro_rules! if_runtime {
#[cfg(feature = "timer")]
pub mod clock;
+
#[cfg(feature = "codec")]
pub mod codec;
+
#[cfg(feature = "fs")]
pub mod fs;
+
pub mod future;
+
#[cfg(feature = "io")]
pub mod io;
-#[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))]
+
+#[cfg(feature = "net-driver")]
pub mod net;
+
pub mod prelude;
+
#[cfg(feature = "process")]
pub mod process;
+
#[cfg(feature = "signal")]
pub mod signal;
+
pub mod stream;
+
#[cfg(feature = "sync")]
pub mod sync;
+
#[cfg(feature = "timer")]
pub mod timer;
@@ -113,6 +124,7 @@ if_runtime! {
#[cfg(feature = "macros")]
#[doc(inline)]
pub use tokio_macros::main;
+
#[cfg(feature = "macros")]
#[doc(inline)]
pub use tokio_macros::test;
diff --git a/tokio/src/net.rs b/tokio/src/net.rs
deleted file mode 100644
index 6aa1e499..00000000
--- a/tokio/src/net.rs
+++ /dev/null
@@ -1,67 +0,0 @@
-//! TCP/UDP/Unix bindings for `tokio`.
-//!
-//! This module contains the TCP/UDP/Unix networking types, similar to the standard
-//! library, which can be used to implement networking protocols.
-//!
-//! # Organization
-//!
-//! * [`TcpListener`] and [`TcpStream`] provide functionality for communication over TCP
-//! * [`UdpSocket`] provides functionality for communication over UDP
-//! * [`UnixListener`] and [`UnixStream`] provide functionality for communication over a
-//! Unix Domain Stream Socket **(available on Unix only)**
-//! * [`UnixDatagram`] and [`UnixDatagramFramed`] provide functionality for communication
-//! over Unix Domain Datagram Socket **(available on Unix only)**
-
-//!
-//! [`TcpListener`]: struct.TcpListener.html
-//! [`TcpStream`]: struct.TcpStream.html
-//! [`UdpSocket`]: struct.UdpSocket.html
-//! [`UnixListener`]: struct.UnixListener.html
-//! [`UnixStream`]: struct.UnixStream.html
-//! [`UnixDatagram`]: struct.UnixDatagram.html
-//! [`UnixDatagramFramed`]: struct.UnixDatagramFramed.html
-
-#[cfg(feature = "tcp")]
-pub mod tcp {
- //! TCP bindings for `tokio`.
- //!
- //! Connecting to an address, via TCP, can be done using [`TcpStream`]'s
- //! [`connect`] method, which returns [`ConnectFuture`]. `ConnectFuture`
- //! implements a future which returns a `TcpStream`.
- //!
- //! To listen on an address [`TcpListener`] can be used. `TcpListener`'s
- //! [`incoming`][incoming_method] method can be used to accept new connections.
- //! It return the [`Incoming`] struct, which implements a stream which returns
- //! `TcpStream`s.
- //!
- //! [`TcpStream`]: struct.TcpStream.html
- //! [`connect`]: struct.TcpStream.html#method.connect
- //! [`ConnectFuture`]: struct.ConnectFuture.html
- //! [`TcpListener`]: struct.TcpListener.html
- //! [incoming_method]: struct.TcpListener.html#method.incoming
- //! [`Incoming`]: struct.Incoming.html
- pub use tokio_net::tcp::{split, Incoming, TcpListener, TcpStream};
-}
-#[cfg(feature = "tcp")]
-pub use self::tcp::{TcpListener, TcpStream};
-
-#[cfg(feature = "udp")]
-pub mod udp {
- //! UDP bindings for `tokio`.
- //!
- //! The main struct for UDP is the [`UdpSocket`], which represents a UDP socket.
- //!
- //! [`UdpSocket`]: struct.UdpSocket.html
- pub use tokio_net::udp::{split, UdpSocket};
-}
-#[cfg(feature = "udp")]
-pub use self::udp::UdpSocket;
-
-#[cfg(all(unix, feature = "uds"))]
-pub mod unix {
- //! Unix domain socket bindings for `tokio` (only available on unix systems).
-
- pub use tokio_net::uds::{split, UCred, UnixDatagram, UnixListener, UnixStream};
-}
-#[cfg(all(unix, feature = "uds"))]
-pub use self::unix::{UnixDatagram, UnixListener, UnixStream};
diff --git a/tokio/src/net/addr.rs b/tokio/src/net/addr.rs
new file mode 100644
index 00000000..8b782cfa
--- /dev/null
+++ b/tokio/src/net/addr.rs
@@ -0,0 +1,214 @@
+use tokio_executor::blocking;
+
+use futures_util::future;
+use std::io;
+use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
+
+/// Convert or resolve without blocking to one or more `SocketAddr` values.
+///
+/// Currently, this trait is only used as an argument to Tokio functions that
+/// need to reference a target socket address.
+///
+/// This trait is sealed and is intended to be opaque. Users of Tokio should
+/// only use `ToSocketAddrs` in trait bounds and __must not__ attempt to call
+/// the functions directly or reference associated types. Changing these is not
+/// considered a breaking change.
+pub trait ToSocketAddrs: sealed::ToSocketAddrsPriv {}
+
+type ReadyFuture<T> = future::Ready<io::Result<T>>;
+
+// ===== impl SocketAddr =====
+
+impl ToSocketAddrs for SocketAddr {}
+
+impl sealed::ToSocketAddrsPriv for SocketAddr {
+ type Iter = std::option::IntoIter<SocketAddr>;
+ type Future = ReadyFuture<Self::Iter>;
+
+ fn to_socket_addrs(&self) -> Self::Future {
+ let iter = Some(*self).into_iter();
+ future::ready(Ok(iter))
+ }
+}
+
+// ===== impl str =====
+
+impl ToSocketAddrs for str {}
+
+impl sealed::ToSocketAddrsPriv for str {
+ type Iter = sealed::OneOrMore;
+ type Future = sealed::MaybeReady;
+
+ fn to_socket_addrs(&self) -> Self::Future {
+ use sealed::MaybeReady;
+
+ // First check if the input parses as a socket address
+ let res: Result<SocketAddr, _> = self.parse();
+
+ if let Ok(addr) = res {
+ return MaybeReady::Ready(Some(addr));
+ }
+
+ // Run DNS lookup on the blocking pool
+ let s = self.to_owned();
+
+ MaybeReady::Blocking(blocking::run(move || {
+ std::net::ToSocketAddrs::to_socket_addrs(&s)
+ }))
+ }
+}
+
+// ===== impl (&str, u16) =====
+
+impl ToSocketAddrs for (&'_ str, u16) {}
+
+impl sealed::ToSocketAddrsPriv for (&'_ str, u16) {
+ type Iter = sealed::OneOrMore;
+ type Future = sealed::MaybeReady;
+
+ fn to_socket_addrs(&self) -> Self::Future {
+ use sealed::MaybeReady;
+ use std::net::{SocketAddrV4, SocketAddrV6};
+
+ let (host, port) = *self;
+
+ // try to parse the host as a regular IP address first
+ if let Ok(addr) = host.parse::<Ipv4Addr>() {
+ let addr = SocketAddrV4::new(addr, port);
+ let addr = SocketAddr::V4(addr);
+
+ return MaybeReady::Ready(Some(addr));
+ }
+
+ if let Ok(addr) = host.parse::<Ipv6Addr>() {
+ let addr = SocketAddrV6::new(addr, port, 0, 0);
+ let addr = SocketAddr::V6(addr);
+
+ return MaybeReady::Ready(Some(addr));
+ }
+
+ let host = host.to_owned();
+
+ MaybeReady::Blocking(blocking::run(move || {
+ std::net::ToSocketAddrs::to_socket_addrs(&(&host[..], port))
+ }))
+ }
+}
+
+// ===== impl (IpAddr, u16) =====
+
+impl ToSocketAddrs for (IpAddr, u16) {}
+
+impl sealed::ToSocketAddrsPriv for (IpAddr, u16) {
+ type Iter = std::option::IntoIter<SocketAddr>;
+ type Future = ReadyFuture<Self::Iter>;
+
+ fn to_socket_addrs(&self) -> Self::Future {
+ let iter = Some(SocketAddr::from(*self)).into_iter();
+ future::ready(Ok(iter))
+ }
+}
+
+// ===== impl String =====
+
+impl ToSocketAddrs for String {}
+
+impl sealed::ToSocketAddrsPriv for String {
+ type Iter = <str as sealed::ToSocketAddrsPriv>::Iter;
+ type Future = <str as sealed::ToSocketAddrsPriv>::Future;
+
+ fn to_socket_addrs(&self) -> Self::Future {
+ (&self[..]).to_socket_addrs()
+ }
+}
+
+// ===== impl &'_ impl ToSocketAddrs =====
+
+impl<T: ToSocketAddrs + ?Sized> ToSocketAddrs for &'_ T {}
+
+impl<T> sealed::ToSocketAddrsPriv for &'_ T
+where
+ T: sealed::ToSocketAddrsPriv + ?Sized,
+{
+ type Iter = T::Iter;
+ type Future = T::Future;
+
+ fn to_socket_addrs(&self) -> Self::Future {
+ (**self).to_socket_addrs()
+ }
+}
+
+pub(crate) mod sealed {
+ //! The contents of this trait are intended to remain private and __not__
+ //! part of the `ToSocketAddrs` public API. The details will change over
+ //! time.
+
+ use tokio_executor::blocking::Blocking;
+
+ use futures_core::ready;
+ use std::future::Future;
+ use std::io;
+ use std::net::SocketAddr;
+ use std::option;
+ use std::pin::Pin;
+ use std::task::{Context, Poll};
+ use std::vec;
+
+ #[doc(hidden)]
+ pub trait ToSocketAddrsPriv {
+ type Iter: Iterator<Item = SocketAddr> + Send + 'static;
+ type Future: Future<Output = io::Result<Self::Iter>> + Send + 'static;
+
+ fn to_socket_addrs(&self) -> Self::Future;
+ }
+
+ #[doc(hidden)]
+ #[derive(Debug)]
+ pub enum MaybeReady {
+ Ready(Option<SocketAddr>),
+ Blocking(Blocking<io::Result<vec::IntoIter<SocketAddr>>>),
+ }
+
+ #[doc(hidden)]
+ #[derive(Debug)]
+ pub enum OneOrMore {
+ One(option::IntoIter<SocketAddr>),
+ More(vec::IntoIter<SocketAddr>),
+ }
+
+ impl Future for MaybeReady {
+ type Output = io::Result<OneOrMore>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ match *self {
+ MaybeReady::Ready(ref mut i) => {
+ let iter = OneOrMore::One(i.take().into_iter());
+ Poll::Ready(Ok(iter))
+ }
+ MaybeReady::Blocking(ref mut rx) => {
+ let res = ready!(Pin::new(rx).poll(cx)).map(OneOrMore::More);
+
+ Poll::Ready(res)
+ }
+ }
+ }
+ }
+
+ impl Iterator for OneOrMore {
+ type Item = SocketAddr;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ match self {
+ OneOrMore::One(i) => i.next(),
+ OneOrMore::More(i) => i.next(),
+ }
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ match self {
+ OneOrMore::One(i) => i.size_hint(),
+ OneOrMore::More(i) => i.size_hint(),
+ }
+ }
+ }
+}
diff --git a/tokio/src/net/driver/mod.rs b/tokio/src/net/driver/mod.rs
new file mode 100644
index 00000000..9079ccc7
--- /dev/null
+++ b/tokio/src/net/driver/mod.rs
@@ -0,0 +1,133 @@
+//! Event loop that drives Tokio I/O resources.
+//!
+//! This module contains [`Reactor`], which is the event loop that drives all
+//! Tokio I/O resources. It is the reactor's job to receive events from the
+//! operating system ([epoll], [kqueue], [IOCP], etc...) and forward them to
+//! waiting tasks. It is the bridge between operating system and the futures
+//! model.
+//!
+//! # Overview
+//!
+//! When using Tokio, all operations are asynchronous and represented by
+//! futures. These futures, representing the application logic, are scheduled by
+//! an executor (see [runtime model] for more details). Executors wait for
+//! notifications before scheduling the future for execution time, i.e., nothing
+//! happens until an event is received indicating that the task can make
+//! progress.
+//!
+//! The reactor receives events from the operating system and notifies the
+//! executor.
+//!
+//! Let's start with a basic example, establishing a TCP connection.
+//!
+//! ```
+//! use tokio::net::TcpStream;
+//!
+//! # async fn process<T>(_t: T) {}
+//!
+//! # #[tokio::main]
+//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+//! let stream = TcpStream::connect("93.184.216.34:9243").await?;
+//!
+//! println!("successfully connected");
+//!
+//! process(stream).await;
+//! # Ok(())
+//! # }
+//! ```
+//!
+//! Establishing a TCP connection usually cannot be completed immediately.
+//! [`TcpStream::connect`] does not block the current thread. Instead, it
+//! returns a [future][connect-future] that resolves once the TCP connection has
+//! been established. The connect future itself has no way of knowing when the
+//! TCP connection has been established.
+//!
+//! Before returning the future, [`TcpStream::connect`] registers the socket
+//! with a reactor. This registration process, handled by [`Registration`], is
+//! what links the [`TcpStream`] with the [`Reactor`] instance. At this point,
+//! the reactor starts listening for connection events from the operating system
+//! for that socket.
+//!
+//! Once the connect future is passed to [`tokio::run`], it is spawned onto a
+//! thread pool. The thread pool waits until it is notified that the connection
+//! has completed.
+//!
+//! When the TCP connection is established, the reactor receives an event from
+//! the operating system. It then notifies the thread pool, telling it that the
+//! connect future can complete. At this point, the thread pool will schedule
+//! the task to run on one of its worker threads. This results in the `and_then`
+//! closure to get executed.
+//!
+//! ## Eager registration
+//!
+//! Notice how the snippet does not explicitly reference a reactor. When
+//! [`TcpStream::connect`] is called, it registers the socket with the current
+//! reactor, but no reactor is specified. This works because a reactor
+//! instance is automatically made available when using the Tokio [runtime],
+//! which is done using [`tokio::main`]. The Tokio runtime's executor sets a
+//! thread-local variable referencing the associated [`Reactor`] instance and
+//! [`Handle::current`] (used by [`Registration`]) returns the reference.
+//!
+//! ## Implementation
+//!
+//! The reactor implementation uses [`mio`] to interface with the operating
+//! system's event queue. A call to [`Reactor::poll`] results in a single
+//! call to [`Poll::poll`] which in turn results in a single call to the
+//! operating system's selector.
+//!
+//! The reactor maintains state for each registered I/O resource. This tracks
+//! the executor task to notify when events are provided by the operating
+//! system's selector. This state is stored in a `Sync` data structure and
+//! referenced by [`Registration`]. When the [`Registration`] instance is
+//! dropped, this state is cleaned up. Because the state is stored in a `Sync`
+//! data structure, the [`Registration`] instance is able to be moved to other
+//! threads.
+//!
+//! By default, a runtime's default reactor runs on a background thread. This
+//! ensures that application code cannot significantly impact the reactor's
+//! responsiveness.
+//!
+//! ## Integrating with the reactor
+//!
+//! Tokio comes with a number of I/O resources, like TCP and UDP sockets, that
+//! automatically integrate with the reactor. However, library authors or
+//! applications may wish to implement their own resources that are also backed
+//! by the reactor.
+//!
+//! There are a couple of ways to do this.
+//!
+//! If the custom I/O resource implements [`mio::Evented`] and implements
+//! [`std::io::Read`] and / or [`std::io::Write`], then [`PollEvented`] is the
+//! most suited.
+//!
+//! Otherwise, [`Registration`] can be used directly. This provides the lowest
+//! level primitive needed for integrating with the reactor: a stream of
+//! readiness events.
+//!
+//! [`Reactor`]: struct.Reactor.html
+//! [`Registration`]: struct.Registration.html
+//! [runtime model]: https://tokio.rs/docs/internals/runtime-model/
+//! [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html
+//! [kqueue]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2
+//! [IOCP]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx
+//! [`TcpStream::connect`]: ../net/struct.TcpStream.html#method.connect
+//! [`connect`]: ../net/struct.TcpStream.html#method.connect
+//! [connect-future]: ../net/struct.ConnectFuture.html
+//! [`tokio::run`]: ../runtime/fn.run.html
+//! [`TcpStream`]: ../net/struct.TcpStream.html
+//! [runtime]: ../runtime
+//! [`Handle::current`]: struct.Handle.html#method.current
+//! [`mio`]: https://github.com/carllerche/mio
+//! [`Reactor::poll`]: struct.Reactor.html#method.poll
+//! [`Poll::poll`]: https://docs.rs/mio/0.6/mio/struct.Poll.html#method.poll
+//! [`mio::Evented`]: https://docs.rs/mio/0.6/mio/trait.Evented.html
+//! [`PollEvented`]: struct.PollEvented.html
+//! [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
+//! [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
+
+pub(crate) mod platform;
+mod reactor;
+mod registration;
+
+pub use self::reactor::{set_default, DefaultGuard, Handle, Reactor};
+pub use self::registration::Registration;
diff --git a/tokio/src/net/driver/platform.rs b/tokio/src/net/driver/platform.rs
new file mode 100644
index 00000000..4cfe7345
--- /dev/null
+++ b/tokio/src/net/driver/platform.rs
@@ -0,0 +1,28 @@
+pub(crate) use self::sys::*;
+
+#[cfg(unix)]
+mod sys {
+ use mio::unix::UnixReady;
+ use mio::Ready;
+
+ pub(crate) fn hup() -> Ready {
+ UnixReady::hup().into()
+ }
+
+ pub(crate) fn is_hup(ready: Ready) -> bool {
+ UnixReady::from(ready).is_hup()
+ }
+}
+
+#[cfg(windows)]
+mod sys {
+ use mio::Ready;
+
+ pub(crate) fn hup() -> Ready {
+ Ready::empty()
+ }
+
+ pub(crate) fn is_hup(_: Ready) -> bool {
+ false
+ }
+}
diff --git a/tokio/src/net/driver/reactor.rs b/tokio/src/net/driver/reactor.rs
new file mode 100644
index 00000000..384abe47
--- /dev/null
+++ b/tokio/src/net/driver/reactor.rs
@@ -0,0 +1,451 @@
+use super::platform;
+
+use tokio_executor::park::{Park, Unpark};
+use tokio_sync::AtomicWaker;
+
+use mio::event::Evented;
+use slab::Slab;
+use std::cell::RefCell;
+use std::io;
+use std::marker::PhantomData;
+#[cfg(all(unix, not(target_os = "fuchsia")))]
+use std::os::unix::io::{AsRawFd, RawFd};
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::{Relaxed, SeqCst};
+use std::sync::{Arc, RwLock, Weak};
+use std::task::Waker;
+use std::time::Duration;
+use std::{fmt, usize};
+
+/// The core reactor, or event loop.
+///
+/// The event loop is the main source of blocking in an application which drives
+/// all other I/O events and notifications happening. Each event loop can have
+/// multiple handles pointing to it, each of which can then be used to create
+/// various I/O objects to interact with the event loop in interesting ways.
+pub struct Reactor {
+ /// Reuse the `mio::Events` value across calls to poll.
+ events: mio::Events,
+
+ /// State shared between the reactor and the handles.
+ inner: Arc<Inner>,
+
+ _wakeup_registration: mio::Registration,
+}
+
+/// A reference to a reactor.
+///
+/// A `Handle` is used for associating I/O objects with an event loop
+/// explicitly. Typically though you won't end up using a `Handle` that often
+/// and will instead use the default reactor for the execution context.
+#[derive(Clone)]
+pub struct Handle {
+ inner: Weak<Inner>,
+}
+
+/// Return value from the `turn` method on `Reactor`.
+///
+/// Currently this value doesn't actually provide any functionality, but it may
+/// in the future give insight into what happened during `turn`.
+#[derive(Debug)]
+pub struct Turn {
+ _priv: (),
+}
+
+pub(super) struct Inner {
+ /// The underlying system event queue.
+ io: mio::Poll,
+
+ /// ABA guard counter
+ next_aba_guard: AtomicUsize,
+
+ /// Dispatch slabs for I/O and futures events
+ pub(super) io_dispatch: RwLock<Slab<ScheduledIo>>,
+
+ /// Used to wake up the reactor from a call to `turn`
+ wakeup: mio::SetReadiness,
+}
+
+pub(super) struct ScheduledIo {
+ aba_guard: usize,
+ pub(super) readiness: AtomicUsize,
+ pub(super) reader: AtomicWaker,
+ pub(super) writer: AtomicWaker,
+}
+
+#[derive(Debug, Eq, PartialEq, Clone, Copy)]
+pub(super) enum Direction {
+ Read,
+ Write,
+}
+
+thread_local! {
+ /// Tracks the reactor for the current execution context.
+ static CURRENT_REACTOR: RefCell<Option<Handle>> = RefCell::new(None)
+}
+
+const TOKEN_SHIFT: usize = 22;
+
+// Kind of arbitrary, but this reserves some token space for later usage.
+const MAX_SOURCES: usize = (1 << TOKEN_SHIFT) - 1;
+const TOKEN_WAKEUP: mio::Token = mio::Token(MAX_SOURCES);
+
+fn _assert_kinds() {
+ fn _assert<T: Send + Sync>() {}
+
+ _assert::<Handle>();
+}
+
+// ===== impl Reactor =====
+
+#[derive(Debug)]
+/// Guard that resets current reactor on drop.
+pub struct DefaultGuard<'a> {
+ _lifetime: PhantomData<&'a u8>,
+}
+
+impl Drop for DefaultGuard<'_> {
+ fn drop(&mut self) {
+ CURRENT_REACTOR.with(|current| {
+ let mut current = current.borrow_mut();
+ *current = None;
+ });
+ }
+}
+
+/// Sets handle for a default reactor, returning guard that unsets it on drop.
+pub fn set_default(handle: &Handle) -> DefaultGuard<'_> {
+ CURRENT_REACTOR.with(|current| {
+ let mut current = current.borrow_mut();
+
+ assert!(
+ current.is_none(),
+ "default Tokio reactor already set \
+ for execution context"
+ );
+
+ *current = Some(handle.clone());
+ });
+
+ DefaultGuard {
+ _lifetime: PhantomData,
+ }
+}
+
+impl Reactor {
+ /// Creates a new event loop, returning any error that happened during the
+ /// creation.
+ pub fn new() -> io::Result<Reactor> {
+ let io = mio::Poll::new()?;
+ let wakeup_pair = mio::Registration::new2();
+
+ io.register(
+ &wakeup_pair.0,
+ TOKEN_WAKEUP,
+ mio::Ready::readable(),
+ mio::PollOpt::level(),
+ )?;
+
+ Ok(Reactor {
+ events: mio::Events::with_capacity(1024),
+ _wakeup_registration: wakeup_pair.0,
+ inner: Arc::new(Inner {
+ io,
+ next_aba_guard: AtomicUsize::new(0),
+ io_dispatch: RwLock::new(Slab::with_capacity(1)),
+ wakeup: wakeup_pair.1,
+ }),
+ })
+ }
+
+ /// Returns a handle to this event loop which can be sent across threads
+ /// and can be used as a proxy to the event loop itself.
+ ///
+ /// Handles are cloneable and clones always refer to the same event loop.
+ /// This handle is typically passed into functions that create I/O objects
+ /// to bind them to this event loop.
+ pub fn handle(&self) -> Handle {
+ Handle {
+ inner: Arc::downgrade(&self.inner),
+ }
+ }
+
+ /// Performs one iteration of the event loop, blocking on waiting for events
+ /// for at most `max_wait` (forever if `None`).
+ ///
+ /// This method is the primary method of running this reactor and processing
+ /// I/O events that occur. This method executes one iteration of an event
+ /// loop, blocking at most once waiting for events to happen.
+ ///
+ /// If a `max_wait` is specified then the method should block no longer than
+ /// the duration specified, but this shouldn't be used as a super-precise
+ /// timer but rather a "ballpark approximation"
+ ///
+ /// # Return value
+ ///
+ /// This function returns an instance of `Turn`
+ ///
+ /// `Turn` as of today has no extra information with it and can be safely
+ /// discarded. In the future `Turn` may contain information about what
+ /// happened while this reactor blocked.
+ ///
+ /// # Errors
+ ///
+ /// This function may also return any I/O error which occurs when polling
+ /// for readiness of I/O objects with the OS. This is quite unlikely to
+ /// arise and typically mean that things have gone horribly wrong at that
+ /// point. Currently this is primarily only known to happen for internal
+ /// bugs to `tokio` itself.
+ pub fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<Turn> {
+ self.poll(max_wait)?;
+ Ok(Turn { _priv: () })
+ }
+
+ /// Returns true if the reactor is currently idle.
+ ///
+ /// Idle is defined as all tasks that have been spawned have completed,
+ /// either successfully or with an error.
+ pub fn is_idle(&self) -> bool {
+